1*8ccd4a63SDavid du Colombier #include "u.h" 2*8ccd4a63SDavid du Colombier #include "lib.h" 3*8ccd4a63SDavid du Colombier #include "dat.h" 4*8ccd4a63SDavid du Colombier #include "fns.h" 5*8ccd4a63SDavid du Colombier #include "error.h" 6*8ccd4a63SDavid du Colombier 7*8ccd4a63SDavid du Colombier static ulong padblockcnt; 8*8ccd4a63SDavid du Colombier static ulong concatblockcnt; 9*8ccd4a63SDavid du Colombier static ulong pullupblockcnt; 10*8ccd4a63SDavid du Colombier static ulong copyblockcnt; 11*8ccd4a63SDavid du Colombier static ulong consumecnt; 12*8ccd4a63SDavid du Colombier static ulong producecnt; 13*8ccd4a63SDavid du Colombier static ulong qcopycnt; 14*8ccd4a63SDavid du Colombier 15*8ccd4a63SDavid du Colombier static int debugging; 16*8ccd4a63SDavid du Colombier 17*8ccd4a63SDavid du Colombier #define QDEBUG if(0) 18*8ccd4a63SDavid du Colombier 19*8ccd4a63SDavid du Colombier /* 20*8ccd4a63SDavid du Colombier * IO queues 21*8ccd4a63SDavid du Colombier */ 22*8ccd4a63SDavid du Colombier struct Queue 23*8ccd4a63SDavid du Colombier { 24*8ccd4a63SDavid du Colombier Lock lk; 25*8ccd4a63SDavid du Colombier 26*8ccd4a63SDavid du Colombier Block* bfirst; /* buffer */ 27*8ccd4a63SDavid du Colombier Block* blast; 28*8ccd4a63SDavid du Colombier 29*8ccd4a63SDavid du Colombier int len; /* bytes allocated to queue */ 30*8ccd4a63SDavid du Colombier int dlen; /* data bytes in queue */ 31*8ccd4a63SDavid du Colombier int limit; /* max bytes in queue */ 32*8ccd4a63SDavid du Colombier int inilim; /* initial limit */ 33*8ccd4a63SDavid du Colombier int state; 34*8ccd4a63SDavid du Colombier int noblock; /* true if writes return immediately when q full */ 35*8ccd4a63SDavid du Colombier int eof; /* number of eofs read by user */ 36*8ccd4a63SDavid du Colombier 37*8ccd4a63SDavid du Colombier void (*kick)(void*); /* restart output */ 38*8ccd4a63SDavid du Colombier void (*bypass)(void*, Block*); /* bypass queue altogether */ 39*8ccd4a63SDavid du Colombier void* arg; /* argument to kick */ 40*8ccd4a63SDavid du Colombier 41*8ccd4a63SDavid du Colombier QLock rlock; /* mutex for reading processes */ 42*8ccd4a63SDavid du Colombier Rendez rr; /* process waiting to read */ 43*8ccd4a63SDavid du Colombier QLock wlock; /* mutex for writing processes */ 44*8ccd4a63SDavid du Colombier Rendez wr; /* process waiting to write */ 45*8ccd4a63SDavid du Colombier 46*8ccd4a63SDavid du Colombier char err[ERRMAX]; 47*8ccd4a63SDavid du Colombier }; 48*8ccd4a63SDavid du Colombier 49*8ccd4a63SDavid du Colombier enum 50*8ccd4a63SDavid du Colombier { 51*8ccd4a63SDavid du Colombier Maxatomic = 64*1024, 52*8ccd4a63SDavid du Colombier }; 53*8ccd4a63SDavid du Colombier 54*8ccd4a63SDavid du Colombier uint qiomaxatomic = Maxatomic; 55*8ccd4a63SDavid du Colombier 56*8ccd4a63SDavid du Colombier void 57*8ccd4a63SDavid du Colombier ixsummary(void) 58*8ccd4a63SDavid du Colombier { 59*8ccd4a63SDavid du Colombier debugging ^= 1; 60*8ccd4a63SDavid du Colombier iallocsummary(); 61*8ccd4a63SDavid du Colombier print("pad %lud, concat %lud, pullup %lud, copy %lud\n", 62*8ccd4a63SDavid du Colombier padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); 63*8ccd4a63SDavid du Colombier print("consume %lud, produce %lud, qcopy %lud\n", 64*8ccd4a63SDavid du Colombier consumecnt, producecnt, qcopycnt); 65*8ccd4a63SDavid du Colombier } 66*8ccd4a63SDavid du Colombier 67*8ccd4a63SDavid du Colombier /* 68*8ccd4a63SDavid du Colombier * free a list of blocks 69*8ccd4a63SDavid du Colombier */ 70*8ccd4a63SDavid du Colombier void 71*8ccd4a63SDavid du Colombier freeblist(Block *b) 72*8ccd4a63SDavid du Colombier { 73*8ccd4a63SDavid du Colombier Block *next; 74*8ccd4a63SDavid du Colombier 75*8ccd4a63SDavid du Colombier for(; b != 0; b = next){ 76*8ccd4a63SDavid du Colombier next = b->next; 77*8ccd4a63SDavid du Colombier b->next = 0; 78*8ccd4a63SDavid du Colombier freeb(b); 79*8ccd4a63SDavid du Colombier } 80*8ccd4a63SDavid du Colombier } 81*8ccd4a63SDavid du Colombier 82*8ccd4a63SDavid du Colombier /* 83*8ccd4a63SDavid du Colombier * pad a block to the front (or the back if size is negative) 84*8ccd4a63SDavid du Colombier */ 85*8ccd4a63SDavid du Colombier Block* 86*8ccd4a63SDavid du Colombier padblock(Block *bp, int size) 87*8ccd4a63SDavid du Colombier { 88*8ccd4a63SDavid du Colombier int n; 89*8ccd4a63SDavid du Colombier Block *nbp; 90*8ccd4a63SDavid du Colombier 91*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "padblock 1"); 92*8ccd4a63SDavid du Colombier if(size >= 0){ 93*8ccd4a63SDavid du Colombier if(bp->rp - bp->base >= size){ 94*8ccd4a63SDavid du Colombier bp->rp -= size; 95*8ccd4a63SDavid du Colombier return bp; 96*8ccd4a63SDavid du Colombier } 97*8ccd4a63SDavid du Colombier 98*8ccd4a63SDavid du Colombier if(bp->next) 99*8ccd4a63SDavid du Colombier panic("padblock 0x%luX", getcallerpc(&bp)); 100*8ccd4a63SDavid du Colombier n = BLEN(bp); 101*8ccd4a63SDavid du Colombier padblockcnt++; 102*8ccd4a63SDavid du Colombier nbp = allocb(size+n); 103*8ccd4a63SDavid du Colombier nbp->rp += size; 104*8ccd4a63SDavid du Colombier nbp->wp = nbp->rp; 105*8ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, n); 106*8ccd4a63SDavid du Colombier nbp->wp += n; 107*8ccd4a63SDavid du Colombier freeb(bp); 108*8ccd4a63SDavid du Colombier nbp->rp -= size; 109*8ccd4a63SDavid du Colombier } else { 110*8ccd4a63SDavid du Colombier size = -size; 111*8ccd4a63SDavid du Colombier 112*8ccd4a63SDavid du Colombier if(bp->next) 113*8ccd4a63SDavid du Colombier panic("padblock 0x%luX", getcallerpc(&bp)); 114*8ccd4a63SDavid du Colombier 115*8ccd4a63SDavid du Colombier if(bp->lim - bp->wp >= size) 116*8ccd4a63SDavid du Colombier return bp; 117*8ccd4a63SDavid du Colombier 118*8ccd4a63SDavid du Colombier n = BLEN(bp); 119*8ccd4a63SDavid du Colombier padblockcnt++; 120*8ccd4a63SDavid du Colombier nbp = allocb(size+n); 121*8ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, n); 122*8ccd4a63SDavid du Colombier nbp->wp += n; 123*8ccd4a63SDavid du Colombier freeb(bp); 124*8ccd4a63SDavid du Colombier } 125*8ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "padblock 1"); 126*8ccd4a63SDavid du Colombier return nbp; 127*8ccd4a63SDavid du Colombier } 128*8ccd4a63SDavid du Colombier 129*8ccd4a63SDavid du Colombier /* 130*8ccd4a63SDavid du Colombier * return count of bytes in a string of blocks 131*8ccd4a63SDavid du Colombier */ 132*8ccd4a63SDavid du Colombier int 133*8ccd4a63SDavid du Colombier blocklen(Block *bp) 134*8ccd4a63SDavid du Colombier { 135*8ccd4a63SDavid du Colombier int len; 136*8ccd4a63SDavid du Colombier 137*8ccd4a63SDavid du Colombier len = 0; 138*8ccd4a63SDavid du Colombier while(bp) { 139*8ccd4a63SDavid du Colombier len += BLEN(bp); 140*8ccd4a63SDavid du Colombier bp = bp->next; 141*8ccd4a63SDavid du Colombier } 142*8ccd4a63SDavid du Colombier return len; 143*8ccd4a63SDavid du Colombier } 144*8ccd4a63SDavid du Colombier 145*8ccd4a63SDavid du Colombier /* 146*8ccd4a63SDavid du Colombier * return count of space in blocks 147*8ccd4a63SDavid du Colombier */ 148*8ccd4a63SDavid du Colombier int 149*8ccd4a63SDavid du Colombier blockalloclen(Block *bp) 150*8ccd4a63SDavid du Colombier { 151*8ccd4a63SDavid du Colombier int len; 152*8ccd4a63SDavid du Colombier 153*8ccd4a63SDavid du Colombier len = 0; 154*8ccd4a63SDavid du Colombier while(bp) { 155*8ccd4a63SDavid du Colombier len += BALLOC(bp); 156*8ccd4a63SDavid du Colombier bp = bp->next; 157*8ccd4a63SDavid du Colombier } 158*8ccd4a63SDavid du Colombier return len; 159*8ccd4a63SDavid du Colombier } 160*8ccd4a63SDavid du Colombier 161*8ccd4a63SDavid du Colombier /* 162*8ccd4a63SDavid du Colombier * copy the string of blocks into 163*8ccd4a63SDavid du Colombier * a single block and free the string 164*8ccd4a63SDavid du Colombier */ 165*8ccd4a63SDavid du Colombier Block* 166*8ccd4a63SDavid du Colombier concatblock(Block *bp) 167*8ccd4a63SDavid du Colombier { 168*8ccd4a63SDavid du Colombier int len; 169*8ccd4a63SDavid du Colombier Block *nb, *f; 170*8ccd4a63SDavid du Colombier 171*8ccd4a63SDavid du Colombier if(bp->next == 0) 172*8ccd4a63SDavid du Colombier return bp; 173*8ccd4a63SDavid du Colombier 174*8ccd4a63SDavid du Colombier nb = allocb(blocklen(bp)); 175*8ccd4a63SDavid du Colombier for(f = bp; f; f = f->next) { 176*8ccd4a63SDavid du Colombier len = BLEN(f); 177*8ccd4a63SDavid du Colombier memmove(nb->wp, f->rp, len); 178*8ccd4a63SDavid du Colombier nb->wp += len; 179*8ccd4a63SDavid du Colombier } 180*8ccd4a63SDavid du Colombier concatblockcnt += BLEN(nb); 181*8ccd4a63SDavid du Colombier freeblist(bp); 182*8ccd4a63SDavid du Colombier QDEBUG checkb(nb, "concatblock 1"); 183*8ccd4a63SDavid du Colombier return nb; 184*8ccd4a63SDavid du Colombier } 185*8ccd4a63SDavid du Colombier 186*8ccd4a63SDavid du Colombier /* 187*8ccd4a63SDavid du Colombier * make sure the first block has at least n bytes 188*8ccd4a63SDavid du Colombier */ 189*8ccd4a63SDavid du Colombier Block* 190*8ccd4a63SDavid du Colombier pullupblock(Block *bp, int n) 191*8ccd4a63SDavid du Colombier { 192*8ccd4a63SDavid du Colombier int i; 193*8ccd4a63SDavid du Colombier Block *nbp; 194*8ccd4a63SDavid du Colombier 195*8ccd4a63SDavid du Colombier /* 196*8ccd4a63SDavid du Colombier * this should almost always be true, it's 197*8ccd4a63SDavid du Colombier * just to avoid every caller checking. 198*8ccd4a63SDavid du Colombier */ 199*8ccd4a63SDavid du Colombier if(BLEN(bp) >= n) 200*8ccd4a63SDavid du Colombier return bp; 201*8ccd4a63SDavid du Colombier 202*8ccd4a63SDavid du Colombier /* 203*8ccd4a63SDavid du Colombier * if not enough room in the first block, 204*8ccd4a63SDavid du Colombier * add another to the front of the list. 205*8ccd4a63SDavid du Colombier */ 206*8ccd4a63SDavid du Colombier if(bp->lim - bp->rp < n){ 207*8ccd4a63SDavid du Colombier nbp = allocb(n); 208*8ccd4a63SDavid du Colombier nbp->next = bp; 209*8ccd4a63SDavid du Colombier bp = nbp; 210*8ccd4a63SDavid du Colombier } 211*8ccd4a63SDavid du Colombier 212*8ccd4a63SDavid du Colombier /* 213*8ccd4a63SDavid du Colombier * copy bytes from the trailing blocks into the first 214*8ccd4a63SDavid du Colombier */ 215*8ccd4a63SDavid du Colombier n -= BLEN(bp); 216*8ccd4a63SDavid du Colombier while((nbp = bp->next)){ 217*8ccd4a63SDavid du Colombier i = BLEN(nbp); 218*8ccd4a63SDavid du Colombier if(i > n) { 219*8ccd4a63SDavid du Colombier memmove(bp->wp, nbp->rp, n); 220*8ccd4a63SDavid du Colombier pullupblockcnt++; 221*8ccd4a63SDavid du Colombier bp->wp += n; 222*8ccd4a63SDavid du Colombier nbp->rp += n; 223*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullupblock 1"); 224*8ccd4a63SDavid du Colombier return bp; 225*8ccd4a63SDavid du Colombier } else { 226*8ccd4a63SDavid du Colombier /* shouldn't happen but why crash if it does */ 227*8ccd4a63SDavid du Colombier if(i < 0){ 228*8ccd4a63SDavid du Colombier print("pullup negative length packet\n"); 229*8ccd4a63SDavid du Colombier i = 0; 230*8ccd4a63SDavid du Colombier } 231*8ccd4a63SDavid du Colombier memmove(bp->wp, nbp->rp, i); 232*8ccd4a63SDavid du Colombier pullupblockcnt++; 233*8ccd4a63SDavid du Colombier bp->wp += i; 234*8ccd4a63SDavid du Colombier bp->next = nbp->next; 235*8ccd4a63SDavid du Colombier nbp->next = 0; 236*8ccd4a63SDavid du Colombier freeb(nbp); 237*8ccd4a63SDavid du Colombier n -= i; 238*8ccd4a63SDavid du Colombier if(n == 0){ 239*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullupblock 2"); 240*8ccd4a63SDavid du Colombier return bp; 241*8ccd4a63SDavid du Colombier } 242*8ccd4a63SDavid du Colombier } 243*8ccd4a63SDavid du Colombier } 244*8ccd4a63SDavid du Colombier freeb(bp); 245*8ccd4a63SDavid du Colombier return 0; 246*8ccd4a63SDavid du Colombier } 247*8ccd4a63SDavid du Colombier 248*8ccd4a63SDavid du Colombier /* 249*8ccd4a63SDavid du Colombier * make sure the first block has at least n bytes 250*8ccd4a63SDavid du Colombier */ 251*8ccd4a63SDavid du Colombier Block* 252*8ccd4a63SDavid du Colombier pullupqueue(Queue *q, int n) 253*8ccd4a63SDavid du Colombier { 254*8ccd4a63SDavid du Colombier Block *b; 255*8ccd4a63SDavid du Colombier 256*8ccd4a63SDavid du Colombier if(BLEN(q->bfirst) >= n) 257*8ccd4a63SDavid du Colombier return q->bfirst; 258*8ccd4a63SDavid du Colombier q->bfirst = pullupblock(q->bfirst, n); 259*8ccd4a63SDavid du Colombier for(b = q->bfirst; b != nil && b->next != nil; b = b->next) 260*8ccd4a63SDavid du Colombier ; 261*8ccd4a63SDavid du Colombier q->blast = b; 262*8ccd4a63SDavid du Colombier return q->bfirst; 263*8ccd4a63SDavid du Colombier } 264*8ccd4a63SDavid du Colombier 265*8ccd4a63SDavid du Colombier /* 266*8ccd4a63SDavid du Colombier * trim to len bytes starting at offset 267*8ccd4a63SDavid du Colombier */ 268*8ccd4a63SDavid du Colombier Block * 269*8ccd4a63SDavid du Colombier trimblock(Block *bp, int offset, int len) 270*8ccd4a63SDavid du Colombier { 271*8ccd4a63SDavid du Colombier ulong l; 272*8ccd4a63SDavid du Colombier Block *nb, *startb; 273*8ccd4a63SDavid du Colombier 274*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "trimblock 1"); 275*8ccd4a63SDavid du Colombier if(blocklen(bp) < offset+len) { 276*8ccd4a63SDavid du Colombier freeblist(bp); 277*8ccd4a63SDavid du Colombier return nil; 278*8ccd4a63SDavid du Colombier } 279*8ccd4a63SDavid du Colombier 280*8ccd4a63SDavid du Colombier while((l = BLEN(bp)) < offset) { 281*8ccd4a63SDavid du Colombier offset -= l; 282*8ccd4a63SDavid du Colombier nb = bp->next; 283*8ccd4a63SDavid du Colombier bp->next = nil; 284*8ccd4a63SDavid du Colombier freeb(bp); 285*8ccd4a63SDavid du Colombier bp = nb; 286*8ccd4a63SDavid du Colombier } 287*8ccd4a63SDavid du Colombier 288*8ccd4a63SDavid du Colombier startb = bp; 289*8ccd4a63SDavid du Colombier bp->rp += offset; 290*8ccd4a63SDavid du Colombier 291*8ccd4a63SDavid du Colombier while((l = BLEN(bp)) < len) { 292*8ccd4a63SDavid du Colombier len -= l; 293*8ccd4a63SDavid du Colombier bp = bp->next; 294*8ccd4a63SDavid du Colombier } 295*8ccd4a63SDavid du Colombier 296*8ccd4a63SDavid du Colombier bp->wp -= (BLEN(bp) - len); 297*8ccd4a63SDavid du Colombier 298*8ccd4a63SDavid du Colombier if(bp->next) { 299*8ccd4a63SDavid du Colombier freeblist(bp->next); 300*8ccd4a63SDavid du Colombier bp->next = nil; 301*8ccd4a63SDavid du Colombier } 302*8ccd4a63SDavid du Colombier 303*8ccd4a63SDavid du Colombier return startb; 304*8ccd4a63SDavid du Colombier } 305*8ccd4a63SDavid du Colombier 306*8ccd4a63SDavid du Colombier /* 307*8ccd4a63SDavid du Colombier * copy 'count' bytes into a new block 308*8ccd4a63SDavid du Colombier */ 309*8ccd4a63SDavid du Colombier Block* 310*8ccd4a63SDavid du Colombier copyblock(Block *bp, int count) 311*8ccd4a63SDavid du Colombier { 312*8ccd4a63SDavid du Colombier int l; 313*8ccd4a63SDavid du Colombier Block *nbp; 314*8ccd4a63SDavid du Colombier 315*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "copyblock 0"); 316*8ccd4a63SDavid du Colombier nbp = allocb(count); 317*8ccd4a63SDavid du Colombier for(; count > 0 && bp != 0; bp = bp->next){ 318*8ccd4a63SDavid du Colombier l = BLEN(bp); 319*8ccd4a63SDavid du Colombier if(l > count) 320*8ccd4a63SDavid du Colombier l = count; 321*8ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, l); 322*8ccd4a63SDavid du Colombier nbp->wp += l; 323*8ccd4a63SDavid du Colombier count -= l; 324*8ccd4a63SDavid du Colombier } 325*8ccd4a63SDavid du Colombier if(count > 0){ 326*8ccd4a63SDavid du Colombier memset(nbp->wp, 0, count); 327*8ccd4a63SDavid du Colombier nbp->wp += count; 328*8ccd4a63SDavid du Colombier } 329*8ccd4a63SDavid du Colombier copyblockcnt++; 330*8ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "copyblock 1"); 331*8ccd4a63SDavid du Colombier 332*8ccd4a63SDavid du Colombier return nbp; 333*8ccd4a63SDavid du Colombier } 334*8ccd4a63SDavid du Colombier 335*8ccd4a63SDavid du Colombier Block* 336*8ccd4a63SDavid du Colombier adjustblock(Block* bp, int len) 337*8ccd4a63SDavid du Colombier { 338*8ccd4a63SDavid du Colombier int n; 339*8ccd4a63SDavid du Colombier Block *nbp; 340*8ccd4a63SDavid du Colombier 341*8ccd4a63SDavid du Colombier if(len < 0){ 342*8ccd4a63SDavid du Colombier freeb(bp); 343*8ccd4a63SDavid du Colombier return nil; 344*8ccd4a63SDavid du Colombier } 345*8ccd4a63SDavid du Colombier 346*8ccd4a63SDavid du Colombier if(bp->rp+len > bp->lim){ 347*8ccd4a63SDavid du Colombier nbp = copyblock(bp, len); 348*8ccd4a63SDavid du Colombier freeblist(bp); 349*8ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "adjustblock 1"); 350*8ccd4a63SDavid du Colombier 351*8ccd4a63SDavid du Colombier return nbp; 352*8ccd4a63SDavid du Colombier } 353*8ccd4a63SDavid du Colombier 354*8ccd4a63SDavid du Colombier n = BLEN(bp); 355*8ccd4a63SDavid du Colombier if(len > n) 356*8ccd4a63SDavid du Colombier memset(bp->wp, 0, len-n); 357*8ccd4a63SDavid du Colombier bp->wp = bp->rp+len; 358*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "adjustblock 2"); 359*8ccd4a63SDavid du Colombier 360*8ccd4a63SDavid du Colombier return bp; 361*8ccd4a63SDavid du Colombier } 362*8ccd4a63SDavid du Colombier 363*8ccd4a63SDavid du Colombier 364*8ccd4a63SDavid du Colombier /* 365*8ccd4a63SDavid du Colombier * throw away up to count bytes from a 366*8ccd4a63SDavid du Colombier * list of blocks. Return count of bytes 367*8ccd4a63SDavid du Colombier * thrown away. 368*8ccd4a63SDavid du Colombier */ 369*8ccd4a63SDavid du Colombier int 370*8ccd4a63SDavid du Colombier pullblock(Block **bph, int count) 371*8ccd4a63SDavid du Colombier { 372*8ccd4a63SDavid du Colombier Block *bp; 373*8ccd4a63SDavid du Colombier int n, bytes; 374*8ccd4a63SDavid du Colombier 375*8ccd4a63SDavid du Colombier bytes = 0; 376*8ccd4a63SDavid du Colombier if(bph == nil) 377*8ccd4a63SDavid du Colombier return 0; 378*8ccd4a63SDavid du Colombier 379*8ccd4a63SDavid du Colombier while(*bph != nil && count != 0) { 380*8ccd4a63SDavid du Colombier bp = *bph; 381*8ccd4a63SDavid du Colombier n = BLEN(bp); 382*8ccd4a63SDavid du Colombier if(count < n) 383*8ccd4a63SDavid du Colombier n = count; 384*8ccd4a63SDavid du Colombier bytes += n; 385*8ccd4a63SDavid du Colombier count -= n; 386*8ccd4a63SDavid du Colombier bp->rp += n; 387*8ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullblock "); 388*8ccd4a63SDavid du Colombier if(BLEN(bp) == 0) { 389*8ccd4a63SDavid du Colombier *bph = bp->next; 390*8ccd4a63SDavid du Colombier bp->next = nil; 391*8ccd4a63SDavid du Colombier freeb(bp); 392*8ccd4a63SDavid du Colombier } 393*8ccd4a63SDavid du Colombier } 394*8ccd4a63SDavid du Colombier return bytes; 395*8ccd4a63SDavid du Colombier } 396*8ccd4a63SDavid du Colombier 397*8ccd4a63SDavid du Colombier /* 398*8ccd4a63SDavid du Colombier * get next block from a queue, return null if nothing there 399*8ccd4a63SDavid du Colombier */ 400*8ccd4a63SDavid du Colombier Block* 401*8ccd4a63SDavid du Colombier qget(Queue *q) 402*8ccd4a63SDavid du Colombier { 403*8ccd4a63SDavid du Colombier int dowakeup; 404*8ccd4a63SDavid du Colombier Block *b; 405*8ccd4a63SDavid du Colombier 406*8ccd4a63SDavid du Colombier /* sync with qwrite */ 407*8ccd4a63SDavid du Colombier ilock(&q->lk); 408*8ccd4a63SDavid du Colombier 409*8ccd4a63SDavid du Colombier b = q->bfirst; 410*8ccd4a63SDavid du Colombier if(b == nil){ 411*8ccd4a63SDavid du Colombier q->state |= Qstarve; 412*8ccd4a63SDavid du Colombier iunlock(&q->lk); 413*8ccd4a63SDavid du Colombier return nil; 414*8ccd4a63SDavid du Colombier } 415*8ccd4a63SDavid du Colombier q->bfirst = b->next; 416*8ccd4a63SDavid du Colombier b->next = 0; 417*8ccd4a63SDavid du Colombier q->len -= BALLOC(b); 418*8ccd4a63SDavid du Colombier q->dlen -= BLEN(b); 419*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qget"); 420*8ccd4a63SDavid du Colombier 421*8ccd4a63SDavid du Colombier /* if writer flow controlled, restart */ 422*8ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){ 423*8ccd4a63SDavid du Colombier q->state &= ~Qflow; 424*8ccd4a63SDavid du Colombier dowakeup = 1; 425*8ccd4a63SDavid du Colombier } else 426*8ccd4a63SDavid du Colombier dowakeup = 0; 427*8ccd4a63SDavid du Colombier 428*8ccd4a63SDavid du Colombier iunlock(&q->lk); 429*8ccd4a63SDavid du Colombier 430*8ccd4a63SDavid du Colombier if(dowakeup) 431*8ccd4a63SDavid du Colombier wakeup(&q->wr); 432*8ccd4a63SDavid du Colombier 433*8ccd4a63SDavid du Colombier return b; 434*8ccd4a63SDavid du Colombier } 435*8ccd4a63SDavid du Colombier 436*8ccd4a63SDavid du Colombier /* 437*8ccd4a63SDavid du Colombier * throw away the next 'len' bytes in the queue 438*8ccd4a63SDavid du Colombier */ 439*8ccd4a63SDavid du Colombier int 440*8ccd4a63SDavid du Colombier qdiscard(Queue *q, int len) 441*8ccd4a63SDavid du Colombier { 442*8ccd4a63SDavid du Colombier Block *b; 443*8ccd4a63SDavid du Colombier int dowakeup, n, sofar; 444*8ccd4a63SDavid du Colombier 445*8ccd4a63SDavid du Colombier ilock(&q->lk); 446*8ccd4a63SDavid du Colombier for(sofar = 0; sofar < len; sofar += n){ 447*8ccd4a63SDavid du Colombier b = q->bfirst; 448*8ccd4a63SDavid du Colombier if(b == nil) 449*8ccd4a63SDavid du Colombier break; 450*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qdiscard"); 451*8ccd4a63SDavid du Colombier n = BLEN(b); 452*8ccd4a63SDavid du Colombier if(n <= len - sofar){ 453*8ccd4a63SDavid du Colombier q->bfirst = b->next; 454*8ccd4a63SDavid du Colombier b->next = 0; 455*8ccd4a63SDavid du Colombier q->len -= BALLOC(b); 456*8ccd4a63SDavid du Colombier q->dlen -= BLEN(b); 457*8ccd4a63SDavid du Colombier freeb(b); 458*8ccd4a63SDavid du Colombier } else { 459*8ccd4a63SDavid du Colombier n = len - sofar; 460*8ccd4a63SDavid du Colombier b->rp += n; 461*8ccd4a63SDavid du Colombier q->dlen -= n; 462*8ccd4a63SDavid du Colombier } 463*8ccd4a63SDavid du Colombier } 464*8ccd4a63SDavid du Colombier 465*8ccd4a63SDavid du Colombier /* 466*8ccd4a63SDavid du Colombier * if writer flow controlled, restart 467*8ccd4a63SDavid du Colombier * 468*8ccd4a63SDavid du Colombier * This used to be 469*8ccd4a63SDavid du Colombier * q->len < q->limit/2 470*8ccd4a63SDavid du Colombier * but it slows down tcp too much for certain write sizes. 471*8ccd4a63SDavid du Colombier * I really don't understand it completely. It may be 472*8ccd4a63SDavid du Colombier * due to the queue draining so fast that the transmission 473*8ccd4a63SDavid du Colombier * stalls waiting for the app to produce more data. - presotto 474*8ccd4a63SDavid du Colombier */ 475*8ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit){ 476*8ccd4a63SDavid du Colombier q->state &= ~Qflow; 477*8ccd4a63SDavid du Colombier dowakeup = 1; 478*8ccd4a63SDavid du Colombier } else 479*8ccd4a63SDavid du Colombier dowakeup = 0; 480*8ccd4a63SDavid du Colombier 481*8ccd4a63SDavid du Colombier iunlock(&q->lk); 482*8ccd4a63SDavid du Colombier 483*8ccd4a63SDavid du Colombier if(dowakeup) 484*8ccd4a63SDavid du Colombier wakeup(&q->wr); 485*8ccd4a63SDavid du Colombier 486*8ccd4a63SDavid du Colombier return sofar; 487*8ccd4a63SDavid du Colombier } 488*8ccd4a63SDavid du Colombier 489*8ccd4a63SDavid du Colombier /* 490*8ccd4a63SDavid du Colombier * Interrupt level copy out of a queue, return # bytes copied. 491*8ccd4a63SDavid du Colombier */ 492*8ccd4a63SDavid du Colombier int 493*8ccd4a63SDavid du Colombier qconsume(Queue *q, void *vp, int len) 494*8ccd4a63SDavid du Colombier { 495*8ccd4a63SDavid du Colombier Block *b; 496*8ccd4a63SDavid du Colombier int n, dowakeup; 497*8ccd4a63SDavid du Colombier uchar *p = vp; 498*8ccd4a63SDavid du Colombier Block *tofree = nil; 499*8ccd4a63SDavid du Colombier 500*8ccd4a63SDavid du Colombier /* sync with qwrite */ 501*8ccd4a63SDavid du Colombier ilock(&q->lk); 502*8ccd4a63SDavid du Colombier 503*8ccd4a63SDavid du Colombier for(;;) { 504*8ccd4a63SDavid du Colombier b = q->bfirst; 505*8ccd4a63SDavid du Colombier if(b == 0){ 506*8ccd4a63SDavid du Colombier q->state |= Qstarve; 507*8ccd4a63SDavid du Colombier iunlock(&q->lk); 508*8ccd4a63SDavid du Colombier return -1; 509*8ccd4a63SDavid du Colombier } 510*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qconsume 1"); 511*8ccd4a63SDavid du Colombier 512*8ccd4a63SDavid du Colombier n = BLEN(b); 513*8ccd4a63SDavid du Colombier if(n > 0) 514*8ccd4a63SDavid du Colombier break; 515*8ccd4a63SDavid du Colombier q->bfirst = b->next; 516*8ccd4a63SDavid du Colombier q->len -= BALLOC(b); 517*8ccd4a63SDavid du Colombier 518*8ccd4a63SDavid du Colombier /* remember to free this */ 519*8ccd4a63SDavid du Colombier b->next = tofree; 520*8ccd4a63SDavid du Colombier tofree = b; 521*8ccd4a63SDavid du Colombier }; 522*8ccd4a63SDavid du Colombier 523*8ccd4a63SDavid du Colombier if(n < len) 524*8ccd4a63SDavid du Colombier len = n; 525*8ccd4a63SDavid du Colombier memmove(p, b->rp, len); 526*8ccd4a63SDavid du Colombier consumecnt += n; 527*8ccd4a63SDavid du Colombier b->rp += len; 528*8ccd4a63SDavid du Colombier q->dlen -= len; 529*8ccd4a63SDavid du Colombier 530*8ccd4a63SDavid du Colombier /* discard the block if we're done with it */ 531*8ccd4a63SDavid du Colombier if((q->state & Qmsg) || len == n){ 532*8ccd4a63SDavid du Colombier q->bfirst = b->next; 533*8ccd4a63SDavid du Colombier b->next = 0; 534*8ccd4a63SDavid du Colombier q->len -= BALLOC(b); 535*8ccd4a63SDavid du Colombier q->dlen -= BLEN(b); 536*8ccd4a63SDavid du Colombier 537*8ccd4a63SDavid du Colombier /* remember to free this */ 538*8ccd4a63SDavid du Colombier b->next = tofree; 539*8ccd4a63SDavid du Colombier tofree = b; 540*8ccd4a63SDavid du Colombier } 541*8ccd4a63SDavid du Colombier 542*8ccd4a63SDavid du Colombier /* if writer flow controlled, restart */ 543*8ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){ 544*8ccd4a63SDavid du Colombier q->state &= ~Qflow; 545*8ccd4a63SDavid du Colombier dowakeup = 1; 546*8ccd4a63SDavid du Colombier } else 547*8ccd4a63SDavid du Colombier dowakeup = 0; 548*8ccd4a63SDavid du Colombier 549*8ccd4a63SDavid du Colombier iunlock(&q->lk); 550*8ccd4a63SDavid du Colombier 551*8ccd4a63SDavid du Colombier if(dowakeup) 552*8ccd4a63SDavid du Colombier wakeup(&q->wr); 553*8ccd4a63SDavid du Colombier 554*8ccd4a63SDavid du Colombier if(tofree != nil) 555*8ccd4a63SDavid du Colombier freeblist(tofree); 556*8ccd4a63SDavid du Colombier 557*8ccd4a63SDavid du Colombier return len; 558*8ccd4a63SDavid du Colombier } 559*8ccd4a63SDavid du Colombier 560*8ccd4a63SDavid du Colombier int 561*8ccd4a63SDavid du Colombier qpass(Queue *q, Block *b) 562*8ccd4a63SDavid du Colombier { 563*8ccd4a63SDavid du Colombier int dlen, len, dowakeup; 564*8ccd4a63SDavid du Colombier 565*8ccd4a63SDavid du Colombier /* sync with qread */ 566*8ccd4a63SDavid du Colombier dowakeup = 0; 567*8ccd4a63SDavid du Colombier ilock(&q->lk); 568*8ccd4a63SDavid du Colombier if(q->len >= q->limit){ 569*8ccd4a63SDavid du Colombier freeblist(b); 570*8ccd4a63SDavid du Colombier iunlock(&q->lk); 571*8ccd4a63SDavid du Colombier return -1; 572*8ccd4a63SDavid du Colombier } 573*8ccd4a63SDavid du Colombier if(q->state & Qclosed){ 574*8ccd4a63SDavid du Colombier freeblist(b); 575*8ccd4a63SDavid du Colombier iunlock(&q->lk); 576*8ccd4a63SDavid du Colombier return BALLOC(b); 577*8ccd4a63SDavid du Colombier } 578*8ccd4a63SDavid du Colombier 579*8ccd4a63SDavid du Colombier /* add buffer to queue */ 580*8ccd4a63SDavid du Colombier if(q->bfirst) 581*8ccd4a63SDavid du Colombier q->blast->next = b; 582*8ccd4a63SDavid du Colombier else 583*8ccd4a63SDavid du Colombier q->bfirst = b; 584*8ccd4a63SDavid du Colombier len = BALLOC(b); 585*8ccd4a63SDavid du Colombier dlen = BLEN(b); 586*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass"); 587*8ccd4a63SDavid du Colombier while(b->next){ 588*8ccd4a63SDavid du Colombier b = b->next; 589*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass"); 590*8ccd4a63SDavid du Colombier len += BALLOC(b); 591*8ccd4a63SDavid du Colombier dlen += BLEN(b); 592*8ccd4a63SDavid du Colombier } 593*8ccd4a63SDavid du Colombier q->blast = b; 594*8ccd4a63SDavid du Colombier q->len += len; 595*8ccd4a63SDavid du Colombier q->dlen += dlen; 596*8ccd4a63SDavid du Colombier 597*8ccd4a63SDavid du Colombier if(q->len >= q->limit/2) 598*8ccd4a63SDavid du Colombier q->state |= Qflow; 599*8ccd4a63SDavid du Colombier 600*8ccd4a63SDavid du Colombier if(q->state & Qstarve){ 601*8ccd4a63SDavid du Colombier q->state &= ~Qstarve; 602*8ccd4a63SDavid du Colombier dowakeup = 1; 603*8ccd4a63SDavid du Colombier } 604*8ccd4a63SDavid du Colombier iunlock(&q->lk); 605*8ccd4a63SDavid du Colombier 606*8ccd4a63SDavid du Colombier if(dowakeup) 607*8ccd4a63SDavid du Colombier wakeup(&q->rr); 608*8ccd4a63SDavid du Colombier 609*8ccd4a63SDavid du Colombier return len; 610*8ccd4a63SDavid du Colombier } 611*8ccd4a63SDavid du Colombier 612*8ccd4a63SDavid du Colombier int 613*8ccd4a63SDavid du Colombier qpassnolim(Queue *q, Block *b) 614*8ccd4a63SDavid du Colombier { 615*8ccd4a63SDavid du Colombier int dlen, len, dowakeup; 616*8ccd4a63SDavid du Colombier 617*8ccd4a63SDavid du Colombier /* sync with qread */ 618*8ccd4a63SDavid du Colombier dowakeup = 0; 619*8ccd4a63SDavid du Colombier ilock(&q->lk); 620*8ccd4a63SDavid du Colombier 621*8ccd4a63SDavid du Colombier if(q->state & Qclosed){ 622*8ccd4a63SDavid du Colombier freeblist(b); 623*8ccd4a63SDavid du Colombier iunlock(&q->lk); 624*8ccd4a63SDavid du Colombier return BALLOC(b); 625*8ccd4a63SDavid du Colombier } 626*8ccd4a63SDavid du Colombier 627*8ccd4a63SDavid du Colombier /* add buffer to queue */ 628*8ccd4a63SDavid du Colombier if(q->bfirst) 629*8ccd4a63SDavid du Colombier q->blast->next = b; 630*8ccd4a63SDavid du Colombier else 631*8ccd4a63SDavid du Colombier q->bfirst = b; 632*8ccd4a63SDavid du Colombier len = BALLOC(b); 633*8ccd4a63SDavid du Colombier dlen = BLEN(b); 634*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass"); 635*8ccd4a63SDavid du Colombier while(b->next){ 636*8ccd4a63SDavid du Colombier b = b->next; 637*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass"); 638*8ccd4a63SDavid du Colombier len += BALLOC(b); 639*8ccd4a63SDavid du Colombier dlen += BLEN(b); 640*8ccd4a63SDavid du Colombier } 641*8ccd4a63SDavid du Colombier q->blast = b; 642*8ccd4a63SDavid du Colombier q->len += len; 643*8ccd4a63SDavid du Colombier q->dlen += dlen; 644*8ccd4a63SDavid du Colombier 645*8ccd4a63SDavid du Colombier if(q->len >= q->limit/2) 646*8ccd4a63SDavid du Colombier q->state |= Qflow; 647*8ccd4a63SDavid du Colombier 648*8ccd4a63SDavid du Colombier if(q->state & Qstarve){ 649*8ccd4a63SDavid du Colombier q->state &= ~Qstarve; 650*8ccd4a63SDavid du Colombier dowakeup = 1; 651*8ccd4a63SDavid du Colombier } 652*8ccd4a63SDavid du Colombier iunlock(&q->lk); 653*8ccd4a63SDavid du Colombier 654*8ccd4a63SDavid du Colombier if(dowakeup) 655*8ccd4a63SDavid du Colombier wakeup(&q->rr); 656*8ccd4a63SDavid du Colombier 657*8ccd4a63SDavid du Colombier return len; 658*8ccd4a63SDavid du Colombier } 659*8ccd4a63SDavid du Colombier 660*8ccd4a63SDavid du Colombier /* 661*8ccd4a63SDavid du Colombier * if the allocated space is way out of line with the used 662*8ccd4a63SDavid du Colombier * space, reallocate to a smaller block 663*8ccd4a63SDavid du Colombier */ 664*8ccd4a63SDavid du Colombier Block* 665*8ccd4a63SDavid du Colombier packblock(Block *bp) 666*8ccd4a63SDavid du Colombier { 667*8ccd4a63SDavid du Colombier Block **l, *nbp; 668*8ccd4a63SDavid du Colombier int n; 669*8ccd4a63SDavid du Colombier 670*8ccd4a63SDavid du Colombier for(l = &bp; *l; l = &(*l)->next){ 671*8ccd4a63SDavid du Colombier nbp = *l; 672*8ccd4a63SDavid du Colombier n = BLEN(nbp); 673*8ccd4a63SDavid du Colombier if((n<<2) < BALLOC(nbp)){ 674*8ccd4a63SDavid du Colombier *l = allocb(n); 675*8ccd4a63SDavid du Colombier memmove((*l)->wp, nbp->rp, n); 676*8ccd4a63SDavid du Colombier (*l)->wp += n; 677*8ccd4a63SDavid du Colombier (*l)->next = nbp->next; 678*8ccd4a63SDavid du Colombier freeb(nbp); 679*8ccd4a63SDavid du Colombier } 680*8ccd4a63SDavid du Colombier } 681*8ccd4a63SDavid du Colombier 682*8ccd4a63SDavid du Colombier return bp; 683*8ccd4a63SDavid du Colombier } 684*8ccd4a63SDavid du Colombier 685*8ccd4a63SDavid du Colombier int 686*8ccd4a63SDavid du Colombier qproduce(Queue *q, void *vp, int len) 687*8ccd4a63SDavid du Colombier { 688*8ccd4a63SDavid du Colombier Block *b; 689*8ccd4a63SDavid du Colombier int dowakeup; 690*8ccd4a63SDavid du Colombier uchar *p = vp; 691*8ccd4a63SDavid du Colombier 692*8ccd4a63SDavid du Colombier /* sync with qread */ 693*8ccd4a63SDavid du Colombier dowakeup = 0; 694*8ccd4a63SDavid du Colombier ilock(&q->lk); 695*8ccd4a63SDavid du Colombier 696*8ccd4a63SDavid du Colombier /* no waiting receivers, room in buffer? */ 697*8ccd4a63SDavid du Colombier if(q->len >= q->limit){ 698*8ccd4a63SDavid du Colombier q->state |= Qflow; 699*8ccd4a63SDavid du Colombier iunlock(&q->lk); 700*8ccd4a63SDavid du Colombier return -1; 701*8ccd4a63SDavid du Colombier } 702*8ccd4a63SDavid du Colombier 703*8ccd4a63SDavid du Colombier /* save in buffer */ 704*8ccd4a63SDavid du Colombier b = iallocb(len); 705*8ccd4a63SDavid du Colombier if(b == 0){ 706*8ccd4a63SDavid du Colombier iunlock(&q->lk); 707*8ccd4a63SDavid du Colombier return 0; 708*8ccd4a63SDavid du Colombier } 709*8ccd4a63SDavid du Colombier memmove(b->wp, p, len); 710*8ccd4a63SDavid du Colombier producecnt += len; 711*8ccd4a63SDavid du Colombier b->wp += len; 712*8ccd4a63SDavid du Colombier if(q->bfirst) 713*8ccd4a63SDavid du Colombier q->blast->next = b; 714*8ccd4a63SDavid du Colombier else 715*8ccd4a63SDavid du Colombier q->bfirst = b; 716*8ccd4a63SDavid du Colombier q->blast = b; 717*8ccd4a63SDavid du Colombier /* b->next = 0; done by iallocb() */ 718*8ccd4a63SDavid du Colombier q->len += BALLOC(b); 719*8ccd4a63SDavid du Colombier q->dlen += BLEN(b); 720*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qproduce"); 721*8ccd4a63SDavid du Colombier 722*8ccd4a63SDavid du Colombier if(q->state & Qstarve){ 723*8ccd4a63SDavid du Colombier q->state &= ~Qstarve; 724*8ccd4a63SDavid du Colombier dowakeup = 1; 725*8ccd4a63SDavid du Colombier } 726*8ccd4a63SDavid du Colombier 727*8ccd4a63SDavid du Colombier if(q->len >= q->limit) 728*8ccd4a63SDavid du Colombier q->state |= Qflow; 729*8ccd4a63SDavid du Colombier iunlock(&q->lk); 730*8ccd4a63SDavid du Colombier 731*8ccd4a63SDavid du Colombier if(dowakeup) 732*8ccd4a63SDavid du Colombier wakeup(&q->rr); 733*8ccd4a63SDavid du Colombier 734*8ccd4a63SDavid du Colombier return len; 735*8ccd4a63SDavid du Colombier } 736*8ccd4a63SDavid du Colombier 737*8ccd4a63SDavid du Colombier /* 738*8ccd4a63SDavid du Colombier * copy from offset in the queue 739*8ccd4a63SDavid du Colombier */ 740*8ccd4a63SDavid du Colombier Block* 741*8ccd4a63SDavid du Colombier qcopy(Queue *q, int len, ulong offset) 742*8ccd4a63SDavid du Colombier { 743*8ccd4a63SDavid du Colombier int sofar; 744*8ccd4a63SDavid du Colombier int n; 745*8ccd4a63SDavid du Colombier Block *b, *nb; 746*8ccd4a63SDavid du Colombier uchar *p; 747*8ccd4a63SDavid du Colombier 748*8ccd4a63SDavid du Colombier nb = allocb(len); 749*8ccd4a63SDavid du Colombier 750*8ccd4a63SDavid du Colombier ilock(&q->lk); 751*8ccd4a63SDavid du Colombier 752*8ccd4a63SDavid du Colombier /* go to offset */ 753*8ccd4a63SDavid du Colombier b = q->bfirst; 754*8ccd4a63SDavid du Colombier for(sofar = 0; ; sofar += n){ 755*8ccd4a63SDavid du Colombier if(b == nil){ 756*8ccd4a63SDavid du Colombier iunlock(&q->lk); 757*8ccd4a63SDavid du Colombier return nb; 758*8ccd4a63SDavid du Colombier } 759*8ccd4a63SDavid du Colombier n = BLEN(b); 760*8ccd4a63SDavid du Colombier if(sofar + n > offset){ 761*8ccd4a63SDavid du Colombier p = b->rp + offset - sofar; 762*8ccd4a63SDavid du Colombier n -= offset - sofar; 763*8ccd4a63SDavid du Colombier break; 764*8ccd4a63SDavid du Colombier } 765*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qcopy"); 766*8ccd4a63SDavid du Colombier b = b->next; 767*8ccd4a63SDavid du Colombier } 768*8ccd4a63SDavid du Colombier 769*8ccd4a63SDavid du Colombier /* copy bytes from there */ 770*8ccd4a63SDavid du Colombier for(sofar = 0; sofar < len;){ 771*8ccd4a63SDavid du Colombier if(n > len - sofar) 772*8ccd4a63SDavid du Colombier n = len - sofar; 773*8ccd4a63SDavid du Colombier memmove(nb->wp, p, n); 774*8ccd4a63SDavid du Colombier qcopycnt += n; 775*8ccd4a63SDavid du Colombier sofar += n; 776*8ccd4a63SDavid du Colombier nb->wp += n; 777*8ccd4a63SDavid du Colombier b = b->next; 778*8ccd4a63SDavid du Colombier if(b == nil) 779*8ccd4a63SDavid du Colombier break; 780*8ccd4a63SDavid du Colombier n = BLEN(b); 781*8ccd4a63SDavid du Colombier p = b->rp; 782*8ccd4a63SDavid du Colombier } 783*8ccd4a63SDavid du Colombier iunlock(&q->lk); 784*8ccd4a63SDavid du Colombier 785*8ccd4a63SDavid du Colombier return nb; 786*8ccd4a63SDavid du Colombier } 787*8ccd4a63SDavid du Colombier 788*8ccd4a63SDavid du Colombier /* 789*8ccd4a63SDavid du Colombier * called by non-interrupt code 790*8ccd4a63SDavid du Colombier */ 791*8ccd4a63SDavid du Colombier Queue* 792*8ccd4a63SDavid du Colombier qopen(int limit, int msg, void (*kick)(void*), void *arg) 793*8ccd4a63SDavid du Colombier { 794*8ccd4a63SDavid du Colombier Queue *q; 795*8ccd4a63SDavid du Colombier 796*8ccd4a63SDavid du Colombier q = malloc(sizeof(Queue)); 797*8ccd4a63SDavid du Colombier if(q == 0) 798*8ccd4a63SDavid du Colombier return 0; 799*8ccd4a63SDavid du Colombier 800*8ccd4a63SDavid du Colombier q->limit = q->inilim = limit; 801*8ccd4a63SDavid du Colombier q->kick = kick; 802*8ccd4a63SDavid du Colombier q->arg = arg; 803*8ccd4a63SDavid du Colombier q->state = msg; 804*8ccd4a63SDavid du Colombier 805*8ccd4a63SDavid du Colombier q->state |= Qstarve; 806*8ccd4a63SDavid du Colombier q->eof = 0; 807*8ccd4a63SDavid du Colombier q->noblock = 0; 808*8ccd4a63SDavid du Colombier 809*8ccd4a63SDavid du Colombier return q; 810*8ccd4a63SDavid du Colombier } 811*8ccd4a63SDavid du Colombier 812*8ccd4a63SDavid du Colombier /* open a queue to be bypassed */ 813*8ccd4a63SDavid du Colombier Queue* 814*8ccd4a63SDavid du Colombier qbypass(void (*bypass)(void*, Block*), void *arg) 815*8ccd4a63SDavid du Colombier { 816*8ccd4a63SDavid du Colombier Queue *q; 817*8ccd4a63SDavid du Colombier 818*8ccd4a63SDavid du Colombier q = malloc(sizeof(Queue)); 819*8ccd4a63SDavid du Colombier if(q == 0) 820*8ccd4a63SDavid du Colombier return 0; 821*8ccd4a63SDavid du Colombier 822*8ccd4a63SDavid du Colombier q->limit = 0; 823*8ccd4a63SDavid du Colombier q->arg = arg; 824*8ccd4a63SDavid du Colombier q->bypass = bypass; 825*8ccd4a63SDavid du Colombier q->state = 0; 826*8ccd4a63SDavid du Colombier 827*8ccd4a63SDavid du Colombier return q; 828*8ccd4a63SDavid du Colombier } 829*8ccd4a63SDavid du Colombier 830*8ccd4a63SDavid du Colombier static int 831*8ccd4a63SDavid du Colombier notempty(void *a) 832*8ccd4a63SDavid du Colombier { 833*8ccd4a63SDavid du Colombier Queue *q = a; 834*8ccd4a63SDavid du Colombier 835*8ccd4a63SDavid du Colombier return (q->state & Qclosed) || q->bfirst != 0; 836*8ccd4a63SDavid du Colombier } 837*8ccd4a63SDavid du Colombier 838*8ccd4a63SDavid du Colombier /* 839*8ccd4a63SDavid du Colombier * wait for the queue to be non-empty or closed. 840*8ccd4a63SDavid du Colombier * called with q ilocked. 841*8ccd4a63SDavid du Colombier */ 842*8ccd4a63SDavid du Colombier static int 843*8ccd4a63SDavid du Colombier qwait(Queue *q) 844*8ccd4a63SDavid du Colombier { 845*8ccd4a63SDavid du Colombier /* wait for data */ 846*8ccd4a63SDavid du Colombier for(;;){ 847*8ccd4a63SDavid du Colombier if(q->bfirst != nil) 848*8ccd4a63SDavid du Colombier break; 849*8ccd4a63SDavid du Colombier 850*8ccd4a63SDavid du Colombier if(q->state & Qclosed){ 851*8ccd4a63SDavid du Colombier if(++q->eof > 3) 852*8ccd4a63SDavid du Colombier return -1; 853*8ccd4a63SDavid du Colombier if(*q->err && strcmp(q->err, Ehungup) != 0) 854*8ccd4a63SDavid du Colombier return -1; 855*8ccd4a63SDavid du Colombier return 0; 856*8ccd4a63SDavid du Colombier } 857*8ccd4a63SDavid du Colombier 858*8ccd4a63SDavid du Colombier q->state |= Qstarve; /* flag requesting producer to wake me */ 859*8ccd4a63SDavid du Colombier iunlock(&q->lk); 860*8ccd4a63SDavid du Colombier sleep(&q->rr, notempty, q); 861*8ccd4a63SDavid du Colombier ilock(&q->lk); 862*8ccd4a63SDavid du Colombier } 863*8ccd4a63SDavid du Colombier return 1; 864*8ccd4a63SDavid du Colombier } 865*8ccd4a63SDavid du Colombier 866*8ccd4a63SDavid du Colombier /* 867*8ccd4a63SDavid du Colombier * add a block list to a queue 868*8ccd4a63SDavid du Colombier */ 869*8ccd4a63SDavid du Colombier void 870*8ccd4a63SDavid du Colombier qaddlist(Queue *q, Block *b) 871*8ccd4a63SDavid du Colombier { 872*8ccd4a63SDavid du Colombier /* queue the block */ 873*8ccd4a63SDavid du Colombier if(q->bfirst) 874*8ccd4a63SDavid du Colombier q->blast->next = b; 875*8ccd4a63SDavid du Colombier else 876*8ccd4a63SDavid du Colombier q->bfirst = b; 877*8ccd4a63SDavid du Colombier q->len += blockalloclen(b); 878*8ccd4a63SDavid du Colombier q->dlen += blocklen(b); 879*8ccd4a63SDavid du Colombier while(b->next) 880*8ccd4a63SDavid du Colombier b = b->next; 881*8ccd4a63SDavid du Colombier q->blast = b; 882*8ccd4a63SDavid du Colombier } 883*8ccd4a63SDavid du Colombier 884*8ccd4a63SDavid du Colombier /* 885*8ccd4a63SDavid du Colombier * called with q ilocked 886*8ccd4a63SDavid du Colombier */ 887*8ccd4a63SDavid du Colombier Block* 888*8ccd4a63SDavid du Colombier qremove(Queue *q) 889*8ccd4a63SDavid du Colombier { 890*8ccd4a63SDavid du Colombier Block *b; 891*8ccd4a63SDavid du Colombier 892*8ccd4a63SDavid du Colombier b = q->bfirst; 893*8ccd4a63SDavid du Colombier if(b == nil) 894*8ccd4a63SDavid du Colombier return nil; 895*8ccd4a63SDavid du Colombier q->bfirst = b->next; 896*8ccd4a63SDavid du Colombier b->next = nil; 897*8ccd4a63SDavid du Colombier q->dlen -= BLEN(b); 898*8ccd4a63SDavid du Colombier q->len -= BALLOC(b); 899*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qremove"); 900*8ccd4a63SDavid du Colombier return b; 901*8ccd4a63SDavid du Colombier } 902*8ccd4a63SDavid du Colombier 903*8ccd4a63SDavid du Colombier /* 904*8ccd4a63SDavid du Colombier * copy the contents of a string of blocks into 905*8ccd4a63SDavid du Colombier * memory. emptied blocks are freed. return 906*8ccd4a63SDavid du Colombier * pointer to first unconsumed block. 907*8ccd4a63SDavid du Colombier */ 908*8ccd4a63SDavid du Colombier Block* 909*8ccd4a63SDavid du Colombier bl2mem(uchar *p, Block *b, int n) 910*8ccd4a63SDavid du Colombier { 911*8ccd4a63SDavid du Colombier int i; 912*8ccd4a63SDavid du Colombier Block *next; 913*8ccd4a63SDavid du Colombier 914*8ccd4a63SDavid du Colombier for(; b != nil; b = next){ 915*8ccd4a63SDavid du Colombier i = BLEN(b); 916*8ccd4a63SDavid du Colombier if(i > n){ 917*8ccd4a63SDavid du Colombier memmove(p, b->rp, n); 918*8ccd4a63SDavid du Colombier b->rp += n; 919*8ccd4a63SDavid du Colombier return b; 920*8ccd4a63SDavid du Colombier } 921*8ccd4a63SDavid du Colombier memmove(p, b->rp, i); 922*8ccd4a63SDavid du Colombier n -= i; 923*8ccd4a63SDavid du Colombier p += i; 924*8ccd4a63SDavid du Colombier b->rp += i; 925*8ccd4a63SDavid du Colombier next = b->next; 926*8ccd4a63SDavid du Colombier freeb(b); 927*8ccd4a63SDavid du Colombier } 928*8ccd4a63SDavid du Colombier return nil; 929*8ccd4a63SDavid du Colombier } 930*8ccd4a63SDavid du Colombier 931*8ccd4a63SDavid du Colombier /* 932*8ccd4a63SDavid du Colombier * copy the contents of memory into a string of blocks. 933*8ccd4a63SDavid du Colombier * return nil on error. 934*8ccd4a63SDavid du Colombier */ 935*8ccd4a63SDavid du Colombier Block* 936*8ccd4a63SDavid du Colombier mem2bl(uchar *p, int len) 937*8ccd4a63SDavid du Colombier { 938*8ccd4a63SDavid du Colombier int n; 939*8ccd4a63SDavid du Colombier Block *b, *first, **l; 940*8ccd4a63SDavid du Colombier 941*8ccd4a63SDavid du Colombier first = nil; 942*8ccd4a63SDavid du Colombier l = &first; 943*8ccd4a63SDavid du Colombier if(waserror()){ 944*8ccd4a63SDavid du Colombier freeblist(first); 945*8ccd4a63SDavid du Colombier nexterror(); 946*8ccd4a63SDavid du Colombier } 947*8ccd4a63SDavid du Colombier do { 948*8ccd4a63SDavid du Colombier n = len; 949*8ccd4a63SDavid du Colombier if(n > Maxatomic) 950*8ccd4a63SDavid du Colombier n = Maxatomic; 951*8ccd4a63SDavid du Colombier 952*8ccd4a63SDavid du Colombier *l = b = allocb(n); 953*8ccd4a63SDavid du Colombier /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */ 954*8ccd4a63SDavid du Colombier memmove(b->wp, p, n); 955*8ccd4a63SDavid du Colombier b->wp += n; 956*8ccd4a63SDavid du Colombier p += n; 957*8ccd4a63SDavid du Colombier len -= n; 958*8ccd4a63SDavid du Colombier l = &b->next; 959*8ccd4a63SDavid du Colombier } while(len > 0); 960*8ccd4a63SDavid du Colombier poperror(); 961*8ccd4a63SDavid du Colombier 962*8ccd4a63SDavid du Colombier return first; 963*8ccd4a63SDavid du Colombier } 964*8ccd4a63SDavid du Colombier 965*8ccd4a63SDavid du Colombier /* 966*8ccd4a63SDavid du Colombier * put a block back to the front of the queue 967*8ccd4a63SDavid du Colombier * called with q ilocked 968*8ccd4a63SDavid du Colombier */ 969*8ccd4a63SDavid du Colombier void 970*8ccd4a63SDavid du Colombier qputback(Queue *q, Block *b) 971*8ccd4a63SDavid du Colombier { 972*8ccd4a63SDavid du Colombier b->next = q->bfirst; 973*8ccd4a63SDavid du Colombier if(q->bfirst == nil) 974*8ccd4a63SDavid du Colombier q->blast = b; 975*8ccd4a63SDavid du Colombier q->bfirst = b; 976*8ccd4a63SDavid du Colombier q->len += BALLOC(b); 977*8ccd4a63SDavid du Colombier q->dlen += BLEN(b); 978*8ccd4a63SDavid du Colombier } 979*8ccd4a63SDavid du Colombier 980*8ccd4a63SDavid du Colombier /* 981*8ccd4a63SDavid du Colombier * flow control, get producer going again 982*8ccd4a63SDavid du Colombier * called with q ilocked 983*8ccd4a63SDavid du Colombier */ 984*8ccd4a63SDavid du Colombier static void 985*8ccd4a63SDavid du Colombier qwakeup_iunlock(Queue *q) 986*8ccd4a63SDavid du Colombier { 987*8ccd4a63SDavid du Colombier int dowakeup = 0; 988*8ccd4a63SDavid du Colombier 989*8ccd4a63SDavid du Colombier /* if writer flow controlled, restart */ 990*8ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){ 991*8ccd4a63SDavid du Colombier q->state &= ~Qflow; 992*8ccd4a63SDavid du Colombier dowakeup = 1; 993*8ccd4a63SDavid du Colombier } 994*8ccd4a63SDavid du Colombier 995*8ccd4a63SDavid du Colombier iunlock(&q->lk); 996*8ccd4a63SDavid du Colombier 997*8ccd4a63SDavid du Colombier /* wakeup flow controlled writers */ 998*8ccd4a63SDavid du Colombier if(dowakeup){ 999*8ccd4a63SDavid du Colombier if(q->kick) 1000*8ccd4a63SDavid du Colombier q->kick(q->arg); 1001*8ccd4a63SDavid du Colombier wakeup(&q->wr); 1002*8ccd4a63SDavid du Colombier } 1003*8ccd4a63SDavid du Colombier } 1004*8ccd4a63SDavid du Colombier 1005*8ccd4a63SDavid du Colombier /* 1006*8ccd4a63SDavid du Colombier * get next block from a queue (up to a limit) 1007*8ccd4a63SDavid du Colombier */ 1008*8ccd4a63SDavid du Colombier Block* 1009*8ccd4a63SDavid du Colombier qbread(Queue *q, int len) 1010*8ccd4a63SDavid du Colombier { 1011*8ccd4a63SDavid du Colombier Block *b, *nb; 1012*8ccd4a63SDavid du Colombier int n; 1013*8ccd4a63SDavid du Colombier 1014*8ccd4a63SDavid du Colombier qlock(&q->rlock); 1015*8ccd4a63SDavid du Colombier if(waserror()){ 1016*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1017*8ccd4a63SDavid du Colombier nexterror(); 1018*8ccd4a63SDavid du Colombier } 1019*8ccd4a63SDavid du Colombier 1020*8ccd4a63SDavid du Colombier ilock(&q->lk); 1021*8ccd4a63SDavid du Colombier switch(qwait(q)){ 1022*8ccd4a63SDavid du Colombier case 0: 1023*8ccd4a63SDavid du Colombier /* queue closed */ 1024*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1025*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1026*8ccd4a63SDavid du Colombier poperror(); 1027*8ccd4a63SDavid du Colombier return nil; 1028*8ccd4a63SDavid du Colombier case -1: 1029*8ccd4a63SDavid du Colombier /* multiple reads on a closed queue */ 1030*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1031*8ccd4a63SDavid du Colombier error(q->err); 1032*8ccd4a63SDavid du Colombier } 1033*8ccd4a63SDavid du Colombier 1034*8ccd4a63SDavid du Colombier /* if we get here, there's at least one block in the queue */ 1035*8ccd4a63SDavid du Colombier b = qremove(q); 1036*8ccd4a63SDavid du Colombier n = BLEN(b); 1037*8ccd4a63SDavid du Colombier 1038*8ccd4a63SDavid du Colombier /* split block if it's too big and this is not a message queue */ 1039*8ccd4a63SDavid du Colombier nb = b; 1040*8ccd4a63SDavid du Colombier if(n > len){ 1041*8ccd4a63SDavid du Colombier if((q->state&Qmsg) == 0){ 1042*8ccd4a63SDavid du Colombier n -= len; 1043*8ccd4a63SDavid du Colombier b = allocb(n); 1044*8ccd4a63SDavid du Colombier memmove(b->wp, nb->rp+len, n); 1045*8ccd4a63SDavid du Colombier b->wp += n; 1046*8ccd4a63SDavid du Colombier qputback(q, b); 1047*8ccd4a63SDavid du Colombier } 1048*8ccd4a63SDavid du Colombier nb->wp = nb->rp + len; 1049*8ccd4a63SDavid du Colombier } 1050*8ccd4a63SDavid du Colombier 1051*8ccd4a63SDavid du Colombier /* restart producer */ 1052*8ccd4a63SDavid du Colombier qwakeup_iunlock(q); 1053*8ccd4a63SDavid du Colombier 1054*8ccd4a63SDavid du Colombier poperror(); 1055*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1056*8ccd4a63SDavid du Colombier return nb; 1057*8ccd4a63SDavid du Colombier } 1058*8ccd4a63SDavid du Colombier 1059*8ccd4a63SDavid du Colombier /* 1060*8ccd4a63SDavid du Colombier * read a queue. if no data is queued, post a Block 1061*8ccd4a63SDavid du Colombier * and wait on its Rendez. 1062*8ccd4a63SDavid du Colombier */ 1063*8ccd4a63SDavid du Colombier long 1064*8ccd4a63SDavid du Colombier qread(Queue *q, void *vp, int len) 1065*8ccd4a63SDavid du Colombier { 1066*8ccd4a63SDavid du Colombier Block *b, *first, **l; 1067*8ccd4a63SDavid du Colombier int m, n; 1068*8ccd4a63SDavid du Colombier 1069*8ccd4a63SDavid du Colombier qlock(&q->rlock); 1070*8ccd4a63SDavid du Colombier if(waserror()){ 1071*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1072*8ccd4a63SDavid du Colombier nexterror(); 1073*8ccd4a63SDavid du Colombier } 1074*8ccd4a63SDavid du Colombier 1075*8ccd4a63SDavid du Colombier ilock(&q->lk); 1076*8ccd4a63SDavid du Colombier again: 1077*8ccd4a63SDavid du Colombier switch(qwait(q)){ 1078*8ccd4a63SDavid du Colombier case 0: 1079*8ccd4a63SDavid du Colombier /* queue closed */ 1080*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1081*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1082*8ccd4a63SDavid du Colombier poperror(); 1083*8ccd4a63SDavid du Colombier return 0; 1084*8ccd4a63SDavid du Colombier case -1: 1085*8ccd4a63SDavid du Colombier /* multiple reads on a closed queue */ 1086*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1087*8ccd4a63SDavid du Colombier error(q->err); 1088*8ccd4a63SDavid du Colombier } 1089*8ccd4a63SDavid du Colombier 1090*8ccd4a63SDavid du Colombier /* if we get here, there's at least one block in the queue */ 1091*8ccd4a63SDavid du Colombier if(q->state & Qcoalesce){ 1092*8ccd4a63SDavid du Colombier /* when coalescing, 0 length blocks just go away */ 1093*8ccd4a63SDavid du Colombier b = q->bfirst; 1094*8ccd4a63SDavid du Colombier if(BLEN(b) <= 0){ 1095*8ccd4a63SDavid du Colombier freeb(qremove(q)); 1096*8ccd4a63SDavid du Colombier goto again; 1097*8ccd4a63SDavid du Colombier } 1098*8ccd4a63SDavid du Colombier 1099*8ccd4a63SDavid du Colombier /* grab the first block plus as many 1100*8ccd4a63SDavid du Colombier * following blocks as will completely 1101*8ccd4a63SDavid du Colombier * fit in the read. 1102*8ccd4a63SDavid du Colombier */ 1103*8ccd4a63SDavid du Colombier n = 0; 1104*8ccd4a63SDavid du Colombier l = &first; 1105*8ccd4a63SDavid du Colombier m = BLEN(b); 1106*8ccd4a63SDavid du Colombier for(;;) { 1107*8ccd4a63SDavid du Colombier *l = qremove(q); 1108*8ccd4a63SDavid du Colombier l = &b->next; 1109*8ccd4a63SDavid du Colombier n += m; 1110*8ccd4a63SDavid du Colombier 1111*8ccd4a63SDavid du Colombier b = q->bfirst; 1112*8ccd4a63SDavid du Colombier if(b == nil) 1113*8ccd4a63SDavid du Colombier break; 1114*8ccd4a63SDavid du Colombier m = BLEN(b); 1115*8ccd4a63SDavid du Colombier if(n+m > len) 1116*8ccd4a63SDavid du Colombier break; 1117*8ccd4a63SDavid du Colombier } 1118*8ccd4a63SDavid du Colombier } else { 1119*8ccd4a63SDavid du Colombier first = qremove(q); 1120*8ccd4a63SDavid du Colombier n = BLEN(first); 1121*8ccd4a63SDavid du Colombier } 1122*8ccd4a63SDavid du Colombier 1123*8ccd4a63SDavid du Colombier /* copy to user space outside of the ilock */ 1124*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1125*8ccd4a63SDavid du Colombier b = bl2mem(vp, first, len); 1126*8ccd4a63SDavid du Colombier ilock(&q->lk); 1127*8ccd4a63SDavid du Colombier 1128*8ccd4a63SDavid du Colombier /* take care of any left over partial block */ 1129*8ccd4a63SDavid du Colombier if(b != nil){ 1130*8ccd4a63SDavid du Colombier n -= BLEN(b); 1131*8ccd4a63SDavid du Colombier if(q->state & Qmsg) 1132*8ccd4a63SDavid du Colombier freeb(b); 1133*8ccd4a63SDavid du Colombier else 1134*8ccd4a63SDavid du Colombier qputback(q, b); 1135*8ccd4a63SDavid du Colombier } 1136*8ccd4a63SDavid du Colombier 1137*8ccd4a63SDavid du Colombier /* restart producer */ 1138*8ccd4a63SDavid du Colombier qwakeup_iunlock(q); 1139*8ccd4a63SDavid du Colombier 1140*8ccd4a63SDavid du Colombier poperror(); 1141*8ccd4a63SDavid du Colombier qunlock(&q->rlock); 1142*8ccd4a63SDavid du Colombier return n; 1143*8ccd4a63SDavid du Colombier } 1144*8ccd4a63SDavid du Colombier 1145*8ccd4a63SDavid du Colombier static int 1146*8ccd4a63SDavid du Colombier qnotfull(void *a) 1147*8ccd4a63SDavid du Colombier { 1148*8ccd4a63SDavid du Colombier Queue *q = a; 1149*8ccd4a63SDavid du Colombier 1150*8ccd4a63SDavid du Colombier return q->len < q->limit || (q->state & Qclosed); 1151*8ccd4a63SDavid du Colombier } 1152*8ccd4a63SDavid du Colombier 1153*8ccd4a63SDavid du Colombier ulong noblockcnt; 1154*8ccd4a63SDavid du Colombier 1155*8ccd4a63SDavid du Colombier /* 1156*8ccd4a63SDavid du Colombier * add a block to a queue obeying flow control 1157*8ccd4a63SDavid du Colombier */ 1158*8ccd4a63SDavid du Colombier long 1159*8ccd4a63SDavid du Colombier qbwrite(Queue *q, Block *b) 1160*8ccd4a63SDavid du Colombier { 1161*8ccd4a63SDavid du Colombier int n, dowakeup; 1162*8ccd4a63SDavid du Colombier 1163*8ccd4a63SDavid du Colombier n = BLEN(b); 1164*8ccd4a63SDavid du Colombier 1165*8ccd4a63SDavid du Colombier if(q->bypass){ 1166*8ccd4a63SDavid du Colombier (*q->bypass)(q->arg, b); 1167*8ccd4a63SDavid du Colombier return n; 1168*8ccd4a63SDavid du Colombier } 1169*8ccd4a63SDavid du Colombier 1170*8ccd4a63SDavid du Colombier dowakeup = 0; 1171*8ccd4a63SDavid du Colombier qlock(&q->wlock); 1172*8ccd4a63SDavid du Colombier if(waserror()){ 1173*8ccd4a63SDavid du Colombier if(b != nil) 1174*8ccd4a63SDavid du Colombier freeb(b); 1175*8ccd4a63SDavid du Colombier qunlock(&q->wlock); 1176*8ccd4a63SDavid du Colombier nexterror(); 1177*8ccd4a63SDavid du Colombier } 1178*8ccd4a63SDavid du Colombier 1179*8ccd4a63SDavid du Colombier ilock(&q->lk); 1180*8ccd4a63SDavid du Colombier 1181*8ccd4a63SDavid du Colombier /* give up if the queue is closed */ 1182*8ccd4a63SDavid du Colombier if(q->state & Qclosed){ 1183*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1184*8ccd4a63SDavid du Colombier error(q->err); 1185*8ccd4a63SDavid du Colombier } 1186*8ccd4a63SDavid du Colombier 1187*8ccd4a63SDavid du Colombier /* if nonblocking, don't queue over the limit */ 1188*8ccd4a63SDavid du Colombier if(q->len >= q->limit){ 1189*8ccd4a63SDavid du Colombier if(q->noblock){ 1190*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1191*8ccd4a63SDavid du Colombier freeb(b); 1192*8ccd4a63SDavid du Colombier noblockcnt += n; 1193*8ccd4a63SDavid du Colombier qunlock(&q->wlock); 1194*8ccd4a63SDavid du Colombier poperror(); 1195*8ccd4a63SDavid du Colombier return n; 1196*8ccd4a63SDavid du Colombier } 1197*8ccd4a63SDavid du Colombier } 1198*8ccd4a63SDavid du Colombier 1199*8ccd4a63SDavid du Colombier /* queue the block */ 1200*8ccd4a63SDavid du Colombier if(q->bfirst) 1201*8ccd4a63SDavid du Colombier q->blast->next = b; 1202*8ccd4a63SDavid du Colombier else 1203*8ccd4a63SDavid du Colombier q->bfirst = b; 1204*8ccd4a63SDavid du Colombier q->blast = b; 1205*8ccd4a63SDavid du Colombier b->next = 0; 1206*8ccd4a63SDavid du Colombier q->len += BALLOC(b); 1207*8ccd4a63SDavid du Colombier q->dlen += n; 1208*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qbwrite"); 1209*8ccd4a63SDavid du Colombier b = nil; 1210*8ccd4a63SDavid du Colombier 1211*8ccd4a63SDavid du Colombier /* make sure other end gets awakened */ 1212*8ccd4a63SDavid du Colombier if(q->state & Qstarve){ 1213*8ccd4a63SDavid du Colombier q->state &= ~Qstarve; 1214*8ccd4a63SDavid du Colombier dowakeup = 1; 1215*8ccd4a63SDavid du Colombier } 1216*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1217*8ccd4a63SDavid du Colombier 1218*8ccd4a63SDavid du Colombier /* get output going again */ 1219*8ccd4a63SDavid du Colombier if(q->kick && (dowakeup || (q->state&Qkick))) 1220*8ccd4a63SDavid du Colombier q->kick(q->arg); 1221*8ccd4a63SDavid du Colombier 1222*8ccd4a63SDavid du Colombier /* wakeup anyone consuming at the other end */ 1223*8ccd4a63SDavid du Colombier if(dowakeup){ 1224*8ccd4a63SDavid du Colombier wakeup(&q->rr); 1225*8ccd4a63SDavid du Colombier 1226*8ccd4a63SDavid du Colombier /* if we just wokeup a higher priority process, let it run */ 1227*8ccd4a63SDavid du Colombier /* 1228*8ccd4a63SDavid du Colombier p = wakeup(&q->rr); 1229*8ccd4a63SDavid du Colombier if(p != nil && p->priority > up->priority) 1230*8ccd4a63SDavid du Colombier sched(); 1231*8ccd4a63SDavid du Colombier */ 1232*8ccd4a63SDavid du Colombier } 1233*8ccd4a63SDavid du Colombier 1234*8ccd4a63SDavid du Colombier /* 1235*8ccd4a63SDavid du Colombier * flow control, wait for queue to get below the limit 1236*8ccd4a63SDavid du Colombier * before allowing the process to continue and queue 1237*8ccd4a63SDavid du Colombier * more. We do this here so that postnote can only 1238*8ccd4a63SDavid du Colombier * interrupt us after the data has been queued. This 1239*8ccd4a63SDavid du Colombier * means that things like 9p flushes and ssl messages 1240*8ccd4a63SDavid du Colombier * will not be disrupted by software interrupts. 1241*8ccd4a63SDavid du Colombier * 1242*8ccd4a63SDavid du Colombier * Note - this is moderately dangerous since a process 1243*8ccd4a63SDavid du Colombier * that keeps getting interrupted and rewriting will 1244*8ccd4a63SDavid du Colombier * queue infinite crud. 1245*8ccd4a63SDavid du Colombier */ 1246*8ccd4a63SDavid du Colombier for(;;){ 1247*8ccd4a63SDavid du Colombier if(q->noblock || qnotfull(q)) 1248*8ccd4a63SDavid du Colombier break; 1249*8ccd4a63SDavid du Colombier 1250*8ccd4a63SDavid du Colombier ilock(&q->lk); 1251*8ccd4a63SDavid du Colombier q->state |= Qflow; 1252*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1253*8ccd4a63SDavid du Colombier sleep(&q->wr, qnotfull, q); 1254*8ccd4a63SDavid du Colombier } 1255*8ccd4a63SDavid du Colombier USED(b); 1256*8ccd4a63SDavid du Colombier 1257*8ccd4a63SDavid du Colombier qunlock(&q->wlock); 1258*8ccd4a63SDavid du Colombier poperror(); 1259*8ccd4a63SDavid du Colombier return n; 1260*8ccd4a63SDavid du Colombier } 1261*8ccd4a63SDavid du Colombier 1262*8ccd4a63SDavid du Colombier /* 1263*8ccd4a63SDavid du Colombier * write to a queue. only Maxatomic bytes at a time is atomic. 1264*8ccd4a63SDavid du Colombier */ 1265*8ccd4a63SDavid du Colombier int 1266*8ccd4a63SDavid du Colombier qwrite(Queue *q, void *vp, int len) 1267*8ccd4a63SDavid du Colombier { 1268*8ccd4a63SDavid du Colombier int n, sofar; 1269*8ccd4a63SDavid du Colombier Block *b; 1270*8ccd4a63SDavid du Colombier uchar *p = vp; 1271*8ccd4a63SDavid du Colombier 1272*8ccd4a63SDavid du Colombier QDEBUG if(!islo()) 1273*8ccd4a63SDavid du Colombier print("qwrite hi %lux\n", getcallerpc(&q)); 1274*8ccd4a63SDavid du Colombier 1275*8ccd4a63SDavid du Colombier sofar = 0; 1276*8ccd4a63SDavid du Colombier do { 1277*8ccd4a63SDavid du Colombier n = len-sofar; 1278*8ccd4a63SDavid du Colombier if(n > Maxatomic) 1279*8ccd4a63SDavid du Colombier n = Maxatomic; 1280*8ccd4a63SDavid du Colombier 1281*8ccd4a63SDavid du Colombier b = allocb(n); 1282*8ccd4a63SDavid du Colombier /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */ 1283*8ccd4a63SDavid du Colombier if(waserror()){ 1284*8ccd4a63SDavid du Colombier freeb(b); 1285*8ccd4a63SDavid du Colombier nexterror(); 1286*8ccd4a63SDavid du Colombier } 1287*8ccd4a63SDavid du Colombier memmove(b->wp, p+sofar, n); 1288*8ccd4a63SDavid du Colombier poperror(); 1289*8ccd4a63SDavid du Colombier b->wp += n; 1290*8ccd4a63SDavid du Colombier 1291*8ccd4a63SDavid du Colombier qbwrite(q, b); 1292*8ccd4a63SDavid du Colombier 1293*8ccd4a63SDavid du Colombier sofar += n; 1294*8ccd4a63SDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0); 1295*8ccd4a63SDavid du Colombier 1296*8ccd4a63SDavid du Colombier return len; 1297*8ccd4a63SDavid du Colombier } 1298*8ccd4a63SDavid du Colombier 1299*8ccd4a63SDavid du Colombier /* 1300*8ccd4a63SDavid du Colombier * used by print() to write to a queue. Since we may be splhi or not in 1301*8ccd4a63SDavid du Colombier * a process, don't qlock. 1302*8ccd4a63SDavid du Colombier */ 1303*8ccd4a63SDavid du Colombier int 1304*8ccd4a63SDavid du Colombier qiwrite(Queue *q, void *vp, int len) 1305*8ccd4a63SDavid du Colombier { 1306*8ccd4a63SDavid du Colombier int n, sofar, dowakeup; 1307*8ccd4a63SDavid du Colombier Block *b; 1308*8ccd4a63SDavid du Colombier uchar *p = vp; 1309*8ccd4a63SDavid du Colombier 1310*8ccd4a63SDavid du Colombier dowakeup = 0; 1311*8ccd4a63SDavid du Colombier 1312*8ccd4a63SDavid du Colombier sofar = 0; 1313*8ccd4a63SDavid du Colombier do { 1314*8ccd4a63SDavid du Colombier n = len-sofar; 1315*8ccd4a63SDavid du Colombier if(n > Maxatomic) 1316*8ccd4a63SDavid du Colombier n = Maxatomic; 1317*8ccd4a63SDavid du Colombier 1318*8ccd4a63SDavid du Colombier b = iallocb(n); 1319*8ccd4a63SDavid du Colombier if(b == nil) 1320*8ccd4a63SDavid du Colombier break; 1321*8ccd4a63SDavid du Colombier memmove(b->wp, p+sofar, n); 1322*8ccd4a63SDavid du Colombier b->wp += n; 1323*8ccd4a63SDavid du Colombier 1324*8ccd4a63SDavid du Colombier ilock(&q->lk); 1325*8ccd4a63SDavid du Colombier 1326*8ccd4a63SDavid du Colombier QDEBUG checkb(b, "qiwrite"); 1327*8ccd4a63SDavid du Colombier if(q->bfirst) 1328*8ccd4a63SDavid du Colombier q->blast->next = b; 1329*8ccd4a63SDavid du Colombier else 1330*8ccd4a63SDavid du Colombier q->bfirst = b; 1331*8ccd4a63SDavid du Colombier q->blast = b; 1332*8ccd4a63SDavid du Colombier q->len += BALLOC(b); 1333*8ccd4a63SDavid du Colombier q->dlen += n; 1334*8ccd4a63SDavid du Colombier 1335*8ccd4a63SDavid du Colombier if(q->state & Qstarve){ 1336*8ccd4a63SDavid du Colombier q->state &= ~Qstarve; 1337*8ccd4a63SDavid du Colombier dowakeup = 1; 1338*8ccd4a63SDavid du Colombier } 1339*8ccd4a63SDavid du Colombier 1340*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1341*8ccd4a63SDavid du Colombier 1342*8ccd4a63SDavid du Colombier if(dowakeup){ 1343*8ccd4a63SDavid du Colombier if(q->kick) 1344*8ccd4a63SDavid du Colombier q->kick(q->arg); 1345*8ccd4a63SDavid du Colombier wakeup(&q->rr); 1346*8ccd4a63SDavid du Colombier } 1347*8ccd4a63SDavid du Colombier 1348*8ccd4a63SDavid du Colombier sofar += n; 1349*8ccd4a63SDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0); 1350*8ccd4a63SDavid du Colombier 1351*8ccd4a63SDavid du Colombier return sofar; 1352*8ccd4a63SDavid du Colombier } 1353*8ccd4a63SDavid du Colombier 1354*8ccd4a63SDavid du Colombier /* 1355*8ccd4a63SDavid du Colombier * be extremely careful when calling this, 1356*8ccd4a63SDavid du Colombier * as there is no reference accounting 1357*8ccd4a63SDavid du Colombier */ 1358*8ccd4a63SDavid du Colombier void 1359*8ccd4a63SDavid du Colombier qfree(Queue *q) 1360*8ccd4a63SDavid du Colombier { 1361*8ccd4a63SDavid du Colombier qclose(q); 1362*8ccd4a63SDavid du Colombier free(q); 1363*8ccd4a63SDavid du Colombier } 1364*8ccd4a63SDavid du Colombier 1365*8ccd4a63SDavid du Colombier /* 1366*8ccd4a63SDavid du Colombier * Mark a queue as closed. No further IO is permitted. 1367*8ccd4a63SDavid du Colombier * All blocks are released. 1368*8ccd4a63SDavid du Colombier */ 1369*8ccd4a63SDavid du Colombier void 1370*8ccd4a63SDavid du Colombier qclose(Queue *q) 1371*8ccd4a63SDavid du Colombier { 1372*8ccd4a63SDavid du Colombier Block *bfirst; 1373*8ccd4a63SDavid du Colombier 1374*8ccd4a63SDavid du Colombier if(q == nil) 1375*8ccd4a63SDavid du Colombier return; 1376*8ccd4a63SDavid du Colombier 1377*8ccd4a63SDavid du Colombier /* mark it */ 1378*8ccd4a63SDavid du Colombier ilock(&q->lk); 1379*8ccd4a63SDavid du Colombier q->state |= Qclosed; 1380*8ccd4a63SDavid du Colombier q->state &= ~(Qflow|Qstarve); 1381*8ccd4a63SDavid du Colombier strcpy(q->err, Ehungup); 1382*8ccd4a63SDavid du Colombier bfirst = q->bfirst; 1383*8ccd4a63SDavid du Colombier q->bfirst = 0; 1384*8ccd4a63SDavid du Colombier q->len = 0; 1385*8ccd4a63SDavid du Colombier q->dlen = 0; 1386*8ccd4a63SDavid du Colombier q->noblock = 0; 1387*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1388*8ccd4a63SDavid du Colombier 1389*8ccd4a63SDavid du Colombier /* free queued blocks */ 1390*8ccd4a63SDavid du Colombier freeblist(bfirst); 1391*8ccd4a63SDavid du Colombier 1392*8ccd4a63SDavid du Colombier /* wake up readers/writers */ 1393*8ccd4a63SDavid du Colombier wakeup(&q->rr); 1394*8ccd4a63SDavid du Colombier wakeup(&q->wr); 1395*8ccd4a63SDavid du Colombier } 1396*8ccd4a63SDavid du Colombier 1397*8ccd4a63SDavid du Colombier /* 1398*8ccd4a63SDavid du Colombier * Mark a queue as closed. Wakeup any readers. Don't remove queued 1399*8ccd4a63SDavid du Colombier * blocks. 1400*8ccd4a63SDavid du Colombier */ 1401*8ccd4a63SDavid du Colombier void 1402*8ccd4a63SDavid du Colombier qhangup(Queue *q, char *msg) 1403*8ccd4a63SDavid du Colombier { 1404*8ccd4a63SDavid du Colombier /* mark it */ 1405*8ccd4a63SDavid du Colombier ilock(&q->lk); 1406*8ccd4a63SDavid du Colombier q->state |= Qclosed; 1407*8ccd4a63SDavid du Colombier if(msg == 0 || *msg == 0) 1408*8ccd4a63SDavid du Colombier strcpy(q->err, Ehungup); 1409*8ccd4a63SDavid du Colombier else 1410*8ccd4a63SDavid du Colombier strncpy(q->err, msg, ERRMAX-1); 1411*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1412*8ccd4a63SDavid du Colombier 1413*8ccd4a63SDavid du Colombier /* wake up readers/writers */ 1414*8ccd4a63SDavid du Colombier wakeup(&q->rr); 1415*8ccd4a63SDavid du Colombier wakeup(&q->wr); 1416*8ccd4a63SDavid du Colombier } 1417*8ccd4a63SDavid du Colombier 1418*8ccd4a63SDavid du Colombier /* 1419*8ccd4a63SDavid du Colombier * return non-zero if the q is hungup 1420*8ccd4a63SDavid du Colombier */ 1421*8ccd4a63SDavid du Colombier int 1422*8ccd4a63SDavid du Colombier qisclosed(Queue *q) 1423*8ccd4a63SDavid du Colombier { 1424*8ccd4a63SDavid du Colombier return q->state & Qclosed; 1425*8ccd4a63SDavid du Colombier } 1426*8ccd4a63SDavid du Colombier 1427*8ccd4a63SDavid du Colombier /* 1428*8ccd4a63SDavid du Colombier * mark a queue as no longer hung up 1429*8ccd4a63SDavid du Colombier */ 1430*8ccd4a63SDavid du Colombier void 1431*8ccd4a63SDavid du Colombier qreopen(Queue *q) 1432*8ccd4a63SDavid du Colombier { 1433*8ccd4a63SDavid du Colombier ilock(&q->lk); 1434*8ccd4a63SDavid du Colombier q->state &= ~Qclosed; 1435*8ccd4a63SDavid du Colombier q->state |= Qstarve; 1436*8ccd4a63SDavid du Colombier q->eof = 0; 1437*8ccd4a63SDavid du Colombier q->limit = q->inilim; 1438*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1439*8ccd4a63SDavid du Colombier } 1440*8ccd4a63SDavid du Colombier 1441*8ccd4a63SDavid du Colombier /* 1442*8ccd4a63SDavid du Colombier * return bytes queued 1443*8ccd4a63SDavid du Colombier */ 1444*8ccd4a63SDavid du Colombier int 1445*8ccd4a63SDavid du Colombier qlen(Queue *q) 1446*8ccd4a63SDavid du Colombier { 1447*8ccd4a63SDavid du Colombier return q->dlen; 1448*8ccd4a63SDavid du Colombier } 1449*8ccd4a63SDavid du Colombier 1450*8ccd4a63SDavid du Colombier /* 1451*8ccd4a63SDavid du Colombier * return space remaining before flow control 1452*8ccd4a63SDavid du Colombier */ 1453*8ccd4a63SDavid du Colombier int 1454*8ccd4a63SDavid du Colombier qwindow(Queue *q) 1455*8ccd4a63SDavid du Colombier { 1456*8ccd4a63SDavid du Colombier int l; 1457*8ccd4a63SDavid du Colombier 1458*8ccd4a63SDavid du Colombier l = q->limit - q->len; 1459*8ccd4a63SDavid du Colombier if(l < 0) 1460*8ccd4a63SDavid du Colombier l = 0; 1461*8ccd4a63SDavid du Colombier return l; 1462*8ccd4a63SDavid du Colombier } 1463*8ccd4a63SDavid du Colombier 1464*8ccd4a63SDavid du Colombier /* 1465*8ccd4a63SDavid du Colombier * return true if we can read without blocking 1466*8ccd4a63SDavid du Colombier */ 1467*8ccd4a63SDavid du Colombier int 1468*8ccd4a63SDavid du Colombier qcanread(Queue *q) 1469*8ccd4a63SDavid du Colombier { 1470*8ccd4a63SDavid du Colombier return q->bfirst!=0; 1471*8ccd4a63SDavid du Colombier } 1472*8ccd4a63SDavid du Colombier 1473*8ccd4a63SDavid du Colombier /* 1474*8ccd4a63SDavid du Colombier * change queue limit 1475*8ccd4a63SDavid du Colombier */ 1476*8ccd4a63SDavid du Colombier void 1477*8ccd4a63SDavid du Colombier qsetlimit(Queue *q, int limit) 1478*8ccd4a63SDavid du Colombier { 1479*8ccd4a63SDavid du Colombier q->limit = limit; 1480*8ccd4a63SDavid du Colombier } 1481*8ccd4a63SDavid du Colombier 1482*8ccd4a63SDavid du Colombier /* 1483*8ccd4a63SDavid du Colombier * set blocking/nonblocking 1484*8ccd4a63SDavid du Colombier */ 1485*8ccd4a63SDavid du Colombier void 1486*8ccd4a63SDavid du Colombier qnoblock(Queue *q, int onoff) 1487*8ccd4a63SDavid du Colombier { 1488*8ccd4a63SDavid du Colombier q->noblock = onoff; 1489*8ccd4a63SDavid du Colombier } 1490*8ccd4a63SDavid du Colombier 1491*8ccd4a63SDavid du Colombier /* 1492*8ccd4a63SDavid du Colombier * flush the output queue 1493*8ccd4a63SDavid du Colombier */ 1494*8ccd4a63SDavid du Colombier void 1495*8ccd4a63SDavid du Colombier qflush(Queue *q) 1496*8ccd4a63SDavid du Colombier { 1497*8ccd4a63SDavid du Colombier Block *bfirst; 1498*8ccd4a63SDavid du Colombier 1499*8ccd4a63SDavid du Colombier /* mark it */ 1500*8ccd4a63SDavid du Colombier ilock(&q->lk); 1501*8ccd4a63SDavid du Colombier bfirst = q->bfirst; 1502*8ccd4a63SDavid du Colombier q->bfirst = 0; 1503*8ccd4a63SDavid du Colombier q->len = 0; 1504*8ccd4a63SDavid du Colombier q->dlen = 0; 1505*8ccd4a63SDavid du Colombier iunlock(&q->lk); 1506*8ccd4a63SDavid du Colombier 1507*8ccd4a63SDavid du Colombier /* free queued blocks */ 1508*8ccd4a63SDavid du Colombier freeblist(bfirst); 1509*8ccd4a63SDavid du Colombier 1510*8ccd4a63SDavid du Colombier /* wake up readers/writers */ 1511*8ccd4a63SDavid du Colombier wakeup(&q->wr); 1512*8ccd4a63SDavid du Colombier } 1513*8ccd4a63SDavid du Colombier 1514*8ccd4a63SDavid du Colombier int 1515*8ccd4a63SDavid du Colombier qfull(Queue *q) 1516*8ccd4a63SDavid du Colombier { 1517*8ccd4a63SDavid du Colombier return q->state & Qflow; 1518*8ccd4a63SDavid du Colombier } 1519*8ccd4a63SDavid du Colombier 1520*8ccd4a63SDavid du Colombier int 1521*8ccd4a63SDavid du Colombier qstate(Queue *q) 1522*8ccd4a63SDavid du Colombier { 1523*8ccd4a63SDavid du Colombier return q->state; 1524*8ccd4a63SDavid du Colombier } 1525