19ef1f84bSDavid du Colombier #include "u.h"
29ef1f84bSDavid du Colombier #include "../port/lib.h"
39ef1f84bSDavid du Colombier #include "mem.h"
49ef1f84bSDavid du Colombier #include "dat.h"
59ef1f84bSDavid du Colombier #include "fns.h"
69ef1f84bSDavid du Colombier #include "../port/error.h"
79ef1f84bSDavid du Colombier
89ef1f84bSDavid du Colombier static ulong padblockcnt;
99ef1f84bSDavid du Colombier static ulong concatblockcnt;
109ef1f84bSDavid du Colombier static ulong pullupblockcnt;
119ef1f84bSDavid du Colombier static ulong copyblockcnt;
129ef1f84bSDavid du Colombier static ulong consumecnt;
139ef1f84bSDavid du Colombier static ulong producecnt;
149ef1f84bSDavid du Colombier static ulong qcopycnt;
159ef1f84bSDavid du Colombier
169ef1f84bSDavid du Colombier static int debugging;
179ef1f84bSDavid du Colombier
189ef1f84bSDavid du Colombier #define QDEBUG if(0)
199ef1f84bSDavid du Colombier
209ef1f84bSDavid du Colombier /*
219ef1f84bSDavid du Colombier * IO queues
229ef1f84bSDavid du Colombier */
239ef1f84bSDavid du Colombier typedef struct Queue Queue;
249ef1f84bSDavid du Colombier
259ef1f84bSDavid du Colombier struct Queue
269ef1f84bSDavid du Colombier {
279ef1f84bSDavid du Colombier Lock;
289ef1f84bSDavid du Colombier
299ef1f84bSDavid du Colombier Block* bfirst; /* buffer */
309ef1f84bSDavid du Colombier Block* blast;
319ef1f84bSDavid du Colombier
329ef1f84bSDavid du Colombier int len; /* bytes allocated to queue */
339ef1f84bSDavid du Colombier int dlen; /* data bytes in queue */
349ef1f84bSDavid du Colombier int limit; /* max bytes in queue */
359ef1f84bSDavid du Colombier int inilim; /* initial limit */
369ef1f84bSDavid du Colombier int state;
379ef1f84bSDavid du Colombier int noblock; /* true if writes return immediately when q full */
389ef1f84bSDavid du Colombier int eof; /* number of eofs read by user */
399ef1f84bSDavid du Colombier
409ef1f84bSDavid du Colombier void (*kick)(void*); /* restart output */
419ef1f84bSDavid du Colombier void (*bypass)(void*, Block*); /* bypass queue altogether */
429ef1f84bSDavid du Colombier void* arg; /* argument to kick */
439ef1f84bSDavid du Colombier
449ef1f84bSDavid du Colombier QLock rlock; /* mutex for reading processes */
459ef1f84bSDavid du Colombier Rendez rr; /* process waiting to read */
469ef1f84bSDavid du Colombier QLock wlock; /* mutex for writing processes */
479ef1f84bSDavid du Colombier Rendez wr; /* process waiting to write */
489ef1f84bSDavid du Colombier
499ef1f84bSDavid du Colombier char err[ERRMAX];
509ef1f84bSDavid du Colombier };
519ef1f84bSDavid du Colombier
529ef1f84bSDavid du Colombier enum
539ef1f84bSDavid du Colombier {
549ef1f84bSDavid du Colombier Maxatomic = 64*1024,
559ef1f84bSDavid du Colombier };
569ef1f84bSDavid du Colombier
579ef1f84bSDavid du Colombier uint qiomaxatomic = Maxatomic;
589ef1f84bSDavid du Colombier
599ef1f84bSDavid du Colombier void
ixsummary(void)609ef1f84bSDavid du Colombier ixsummary(void)
619ef1f84bSDavid du Colombier {
629ef1f84bSDavid du Colombier debugging ^= 1;
639ef1f84bSDavid du Colombier iallocsummary();
649ef1f84bSDavid du Colombier print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
659ef1f84bSDavid du Colombier padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
669ef1f84bSDavid du Colombier print("consume %lud, produce %lud, qcopy %lud\n",
679ef1f84bSDavid du Colombier consumecnt, producecnt, qcopycnt);
689ef1f84bSDavid du Colombier }
699ef1f84bSDavid du Colombier
709ef1f84bSDavid du Colombier /*
719ef1f84bSDavid du Colombier * free a list of blocks
729ef1f84bSDavid du Colombier */
739ef1f84bSDavid du Colombier void
freeblist(Block * b)749ef1f84bSDavid du Colombier freeblist(Block *b)
759ef1f84bSDavid du Colombier {
769ef1f84bSDavid du Colombier Block *next;
779ef1f84bSDavid du Colombier
789ef1f84bSDavid du Colombier for(; b != 0; b = next){
799ef1f84bSDavid du Colombier next = b->next;
809ef1f84bSDavid du Colombier b->next = 0;
819ef1f84bSDavid du Colombier freeb(b);
829ef1f84bSDavid du Colombier }
839ef1f84bSDavid du Colombier }
849ef1f84bSDavid du Colombier
859ef1f84bSDavid du Colombier /*
869ef1f84bSDavid du Colombier * pad a block to the front (or the back if size is negative)
879ef1f84bSDavid du Colombier */
889ef1f84bSDavid du Colombier Block*
padblock(Block * bp,int size)899ef1f84bSDavid du Colombier padblock(Block *bp, int size)
909ef1f84bSDavid du Colombier {
919ef1f84bSDavid du Colombier int n;
929ef1f84bSDavid du Colombier Block *nbp;
939ef1f84bSDavid du Colombier
949ef1f84bSDavid du Colombier QDEBUG checkb(bp, "padblock 1");
959ef1f84bSDavid du Colombier if(size >= 0){
969ef1f84bSDavid du Colombier if(bp->rp - bp->base >= size){
979ef1f84bSDavid du Colombier bp->rp -= size;
989ef1f84bSDavid du Colombier return bp;
999ef1f84bSDavid du Colombier }
1009ef1f84bSDavid du Colombier
1019ef1f84bSDavid du Colombier if(bp->next)
1029ef1f84bSDavid du Colombier panic("padblock %#p", getcallerpc(&bp));
1039ef1f84bSDavid du Colombier n = BLEN(bp);
1049ef1f84bSDavid du Colombier padblockcnt++;
1059ef1f84bSDavid du Colombier nbp = allocb(size+n);
1069ef1f84bSDavid du Colombier nbp->rp += size;
1079ef1f84bSDavid du Colombier nbp->wp = nbp->rp;
1089ef1f84bSDavid du Colombier memmove(nbp->wp, bp->rp, n);
1099ef1f84bSDavid du Colombier nbp->wp += n;
1109ef1f84bSDavid du Colombier freeb(bp);
1119ef1f84bSDavid du Colombier nbp->rp -= size;
1129ef1f84bSDavid du Colombier } else {
1139ef1f84bSDavid du Colombier size = -size;
1149ef1f84bSDavid du Colombier
1159ef1f84bSDavid du Colombier if(bp->next)
1169ef1f84bSDavid du Colombier panic("padblock %#p", getcallerpc(&bp));
1179ef1f84bSDavid du Colombier
1189ef1f84bSDavid du Colombier if(bp->lim - bp->wp >= size)
1199ef1f84bSDavid du Colombier return bp;
1209ef1f84bSDavid du Colombier
1219ef1f84bSDavid du Colombier n = BLEN(bp);
1229ef1f84bSDavid du Colombier padblockcnt++;
1239ef1f84bSDavid du Colombier nbp = allocb(size+n);
1249ef1f84bSDavid du Colombier memmove(nbp->wp, bp->rp, n);
1259ef1f84bSDavid du Colombier nbp->wp += n;
1269ef1f84bSDavid du Colombier freeb(bp);
1279ef1f84bSDavid du Colombier }
1289ef1f84bSDavid du Colombier QDEBUG checkb(nbp, "padblock 1");
1299ef1f84bSDavid du Colombier return nbp;
1309ef1f84bSDavid du Colombier }
1319ef1f84bSDavid du Colombier
1329ef1f84bSDavid du Colombier /*
1339ef1f84bSDavid du Colombier * return count of bytes in a string of blocks
1349ef1f84bSDavid du Colombier */
1359ef1f84bSDavid du Colombier int
blocklen(Block * bp)1369ef1f84bSDavid du Colombier blocklen(Block *bp)
1379ef1f84bSDavid du Colombier {
1389ef1f84bSDavid du Colombier int len;
1399ef1f84bSDavid du Colombier
1409ef1f84bSDavid du Colombier len = 0;
1419ef1f84bSDavid du Colombier while(bp) {
1429ef1f84bSDavid du Colombier len += BLEN(bp);
1439ef1f84bSDavid du Colombier bp = bp->next;
1449ef1f84bSDavid du Colombier }
1459ef1f84bSDavid du Colombier return len;
1469ef1f84bSDavid du Colombier }
1479ef1f84bSDavid du Colombier
1489ef1f84bSDavid du Colombier /*
1499ef1f84bSDavid du Colombier * return count of space in blocks
1509ef1f84bSDavid du Colombier */
1519ef1f84bSDavid du Colombier int
blockalloclen(Block * bp)1529ef1f84bSDavid du Colombier blockalloclen(Block *bp)
1539ef1f84bSDavid du Colombier {
1549ef1f84bSDavid du Colombier int len;
1559ef1f84bSDavid du Colombier
1569ef1f84bSDavid du Colombier len = 0;
1579ef1f84bSDavid du Colombier while(bp) {
1589ef1f84bSDavid du Colombier len += BALLOC(bp);
1599ef1f84bSDavid du Colombier bp = bp->next;
1609ef1f84bSDavid du Colombier }
1619ef1f84bSDavid du Colombier return len;
1629ef1f84bSDavid du Colombier }
1639ef1f84bSDavid du Colombier
1649ef1f84bSDavid du Colombier /*
1659ef1f84bSDavid du Colombier * copy the string of blocks into
1669ef1f84bSDavid du Colombier * a single block and free the string
1679ef1f84bSDavid du Colombier */
1689ef1f84bSDavid du Colombier Block*
concatblock(Block * bp)1699ef1f84bSDavid du Colombier concatblock(Block *bp)
1709ef1f84bSDavid du Colombier {
1719ef1f84bSDavid du Colombier int len;
1729ef1f84bSDavid du Colombier Block *nb, *f;
1739ef1f84bSDavid du Colombier
1749ef1f84bSDavid du Colombier if(bp->next == 0)
1759ef1f84bSDavid du Colombier return bp;
1769ef1f84bSDavid du Colombier
1779ef1f84bSDavid du Colombier nb = allocb(blocklen(bp));
1789ef1f84bSDavid du Colombier for(f = bp; f; f = f->next) {
1799ef1f84bSDavid du Colombier len = BLEN(f);
1809ef1f84bSDavid du Colombier memmove(nb->wp, f->rp, len);
1819ef1f84bSDavid du Colombier nb->wp += len;
1829ef1f84bSDavid du Colombier }
1839ef1f84bSDavid du Colombier concatblockcnt += BLEN(nb);
1849ef1f84bSDavid du Colombier freeblist(bp);
1859ef1f84bSDavid du Colombier QDEBUG checkb(nb, "concatblock 1");
1869ef1f84bSDavid du Colombier return nb;
1879ef1f84bSDavid du Colombier }
1889ef1f84bSDavid du Colombier
1899ef1f84bSDavid du Colombier /*
1909ef1f84bSDavid du Colombier * make sure the first block has at least n bytes
1919ef1f84bSDavid du Colombier */
1929ef1f84bSDavid du Colombier Block*
pullupblock(Block * bp,int n)1939ef1f84bSDavid du Colombier pullupblock(Block *bp, int n)
1949ef1f84bSDavid du Colombier {
1959ef1f84bSDavid du Colombier int i;
1969ef1f84bSDavid du Colombier Block *nbp;
1979ef1f84bSDavid du Colombier
1989ef1f84bSDavid du Colombier /*
1999ef1f84bSDavid du Colombier * this should almost always be true, it's
2009ef1f84bSDavid du Colombier * just to avoid every caller checking.
2019ef1f84bSDavid du Colombier */
2029ef1f84bSDavid du Colombier if(BLEN(bp) >= n)
2039ef1f84bSDavid du Colombier return bp;
2049ef1f84bSDavid du Colombier
2059ef1f84bSDavid du Colombier /*
2069ef1f84bSDavid du Colombier * if not enough room in the first block,
2079ef1f84bSDavid du Colombier * add another to the front of the list.
2089ef1f84bSDavid du Colombier */
2099ef1f84bSDavid du Colombier if(bp->lim - bp->rp < n){
2109ef1f84bSDavid du Colombier nbp = allocb(n);
2119ef1f84bSDavid du Colombier nbp->next = bp;
2129ef1f84bSDavid du Colombier bp = nbp;
2139ef1f84bSDavid du Colombier }
2149ef1f84bSDavid du Colombier
2159ef1f84bSDavid du Colombier /*
2169ef1f84bSDavid du Colombier * copy bytes from the trailing blocks into the first
2179ef1f84bSDavid du Colombier */
2189ef1f84bSDavid du Colombier n -= BLEN(bp);
2199ef1f84bSDavid du Colombier while(nbp = bp->next){
2209ef1f84bSDavid du Colombier i = BLEN(nbp);
2219ef1f84bSDavid du Colombier if(i > n) {
2229ef1f84bSDavid du Colombier memmove(bp->wp, nbp->rp, n);
2239ef1f84bSDavid du Colombier pullupblockcnt++;
2249ef1f84bSDavid du Colombier bp->wp += n;
2259ef1f84bSDavid du Colombier nbp->rp += n;
2269ef1f84bSDavid du Colombier QDEBUG checkb(bp, "pullupblock 1");
2279ef1f84bSDavid du Colombier return bp;
2289ef1f84bSDavid du Colombier } else {
2299ef1f84bSDavid du Colombier /* shouldn't happen but why crash if it does */
2309ef1f84bSDavid du Colombier if(i < 0){
2319ef1f84bSDavid du Colombier print("pullupblock -ve length, from %#p\n",
2329ef1f84bSDavid du Colombier getcallerpc(&bp));
2339ef1f84bSDavid du Colombier i = 0;
2349ef1f84bSDavid du Colombier }
2359ef1f84bSDavid du Colombier memmove(bp->wp, nbp->rp, i);
2369ef1f84bSDavid du Colombier pullupblockcnt++;
2379ef1f84bSDavid du Colombier bp->wp += i;
2389ef1f84bSDavid du Colombier bp->next = nbp->next;
2399ef1f84bSDavid du Colombier nbp->next = 0;
2409ef1f84bSDavid du Colombier freeb(nbp);
2419ef1f84bSDavid du Colombier n -= i;
2429ef1f84bSDavid du Colombier if(n == 0){
2439ef1f84bSDavid du Colombier QDEBUG checkb(bp, "pullupblock 2");
2449ef1f84bSDavid du Colombier return bp;
2459ef1f84bSDavid du Colombier }
2469ef1f84bSDavid du Colombier }
2479ef1f84bSDavid du Colombier }
2489ef1f84bSDavid du Colombier freeb(bp);
2499ef1f84bSDavid du Colombier return 0;
2509ef1f84bSDavid du Colombier }
2519ef1f84bSDavid du Colombier
2529ef1f84bSDavid du Colombier /*
2539ef1f84bSDavid du Colombier * make sure the first block has at least n bytes
2549ef1f84bSDavid du Colombier */
2559ef1f84bSDavid du Colombier Block*
pullupqueue(Queue * q,int n)2569ef1f84bSDavid du Colombier pullupqueue(Queue *q, int n)
2579ef1f84bSDavid du Colombier {
2589ef1f84bSDavid du Colombier Block *b;
2599ef1f84bSDavid du Colombier
2609ef1f84bSDavid du Colombier if(BLEN(q->bfirst) >= n)
2619ef1f84bSDavid du Colombier return q->bfirst;
2629ef1f84bSDavid du Colombier q->bfirst = pullupblock(q->bfirst, n);
2639ef1f84bSDavid du Colombier for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
2649ef1f84bSDavid du Colombier ;
2659ef1f84bSDavid du Colombier q->blast = b;
2669ef1f84bSDavid du Colombier return q->bfirst;
2679ef1f84bSDavid du Colombier }
2689ef1f84bSDavid du Colombier
2699ef1f84bSDavid du Colombier /*
2709ef1f84bSDavid du Colombier * trim to len bytes starting at offset
2719ef1f84bSDavid du Colombier */
2729ef1f84bSDavid du Colombier Block *
trimblock(Block * bp,int offset,int len)2739ef1f84bSDavid du Colombier trimblock(Block *bp, int offset, int len)
2749ef1f84bSDavid du Colombier {
2759ef1f84bSDavid du Colombier long l;
2769ef1f84bSDavid du Colombier Block *nb, *startb;
2779ef1f84bSDavid du Colombier
2789ef1f84bSDavid du Colombier QDEBUG checkb(bp, "trimblock 1");
2799ef1f84bSDavid du Colombier if(blocklen(bp) < offset+len) {
2809ef1f84bSDavid du Colombier freeblist(bp);
2819ef1f84bSDavid du Colombier return nil;
2829ef1f84bSDavid du Colombier }
2839ef1f84bSDavid du Colombier
2849ef1f84bSDavid du Colombier while((l = BLEN(bp)) < offset) {
2859ef1f84bSDavid du Colombier offset -= l;
2869ef1f84bSDavid du Colombier nb = bp->next;
2879ef1f84bSDavid du Colombier bp->next = nil;
2889ef1f84bSDavid du Colombier freeb(bp);
2899ef1f84bSDavid du Colombier bp = nb;
2909ef1f84bSDavid du Colombier }
2919ef1f84bSDavid du Colombier
2929ef1f84bSDavid du Colombier startb = bp;
2939ef1f84bSDavid du Colombier bp->rp += offset;
2949ef1f84bSDavid du Colombier
2959ef1f84bSDavid du Colombier while((l = BLEN(bp)) < len) {
2969ef1f84bSDavid du Colombier len -= l;
2979ef1f84bSDavid du Colombier bp = bp->next;
2989ef1f84bSDavid du Colombier }
2999ef1f84bSDavid du Colombier
3009ef1f84bSDavid du Colombier bp->wp -= (BLEN(bp) - len);
3019ef1f84bSDavid du Colombier
3029ef1f84bSDavid du Colombier if(bp->next) {
3039ef1f84bSDavid du Colombier freeblist(bp->next);
3049ef1f84bSDavid du Colombier bp->next = nil;
3059ef1f84bSDavid du Colombier }
3069ef1f84bSDavid du Colombier
3079ef1f84bSDavid du Colombier return startb;
3089ef1f84bSDavid du Colombier }
3099ef1f84bSDavid du Colombier
3109ef1f84bSDavid du Colombier /*
3119ef1f84bSDavid du Colombier * copy 'count' bytes into a new block
3129ef1f84bSDavid du Colombier */
3139ef1f84bSDavid du Colombier Block*
copyblock(Block * bp,int count)3149ef1f84bSDavid du Colombier copyblock(Block *bp, int count)
3159ef1f84bSDavid du Colombier {
3169ef1f84bSDavid du Colombier int l;
3179ef1f84bSDavid du Colombier Block *nbp;
3189ef1f84bSDavid du Colombier
3199ef1f84bSDavid du Colombier QDEBUG checkb(bp, "copyblock 0");
3209ef1f84bSDavid du Colombier if(bp->flag & BINTR){
3219ef1f84bSDavid du Colombier nbp = iallocb(count);
3229ef1f84bSDavid du Colombier if(nbp == nil)
3239ef1f84bSDavid du Colombier return nil;
3249ef1f84bSDavid du Colombier }else
3259ef1f84bSDavid du Colombier nbp = allocb(count);
3269ef1f84bSDavid du Colombier for(; count > 0 && bp != 0; bp = bp->next){
3279ef1f84bSDavid du Colombier l = BLEN(bp);
3289ef1f84bSDavid du Colombier if(l > count)
3299ef1f84bSDavid du Colombier l = count;
3309ef1f84bSDavid du Colombier memmove(nbp->wp, bp->rp, l);
3319ef1f84bSDavid du Colombier nbp->wp += l;
3329ef1f84bSDavid du Colombier count -= l;
3339ef1f84bSDavid du Colombier }
3349ef1f84bSDavid du Colombier if(count > 0){
3359ef1f84bSDavid du Colombier memset(nbp->wp, 0, count);
3369ef1f84bSDavid du Colombier nbp->wp += count;
3379ef1f84bSDavid du Colombier }
3389ef1f84bSDavid du Colombier copyblockcnt++;
3399ef1f84bSDavid du Colombier QDEBUG checkb(nbp, "copyblock 1");
3409ef1f84bSDavid du Colombier
3419ef1f84bSDavid du Colombier return nbp;
3429ef1f84bSDavid du Colombier }
3439ef1f84bSDavid du Colombier
3449ef1f84bSDavid du Colombier Block*
adjustblock(Block * bp,int len)3459ef1f84bSDavid du Colombier adjustblock(Block* bp, int len)
3469ef1f84bSDavid du Colombier {
3479ef1f84bSDavid du Colombier int n;
3489ef1f84bSDavid du Colombier Block *nbp;
3499ef1f84bSDavid du Colombier
3509ef1f84bSDavid du Colombier if(len < 0){
3519ef1f84bSDavid du Colombier freeb(bp);
3529ef1f84bSDavid du Colombier return nil;
3539ef1f84bSDavid du Colombier }
3549ef1f84bSDavid du Colombier
3559ef1f84bSDavid du Colombier if(bp->rp+len > bp->lim){
3569ef1f84bSDavid du Colombier nbp = copyblock(bp, len);
3579ef1f84bSDavid du Colombier freeblist(bp);
3589ef1f84bSDavid du Colombier QDEBUG checkb(nbp, "adjustblock 1");
3599ef1f84bSDavid du Colombier
3609ef1f84bSDavid du Colombier return nbp;
3619ef1f84bSDavid du Colombier }
3629ef1f84bSDavid du Colombier
3639ef1f84bSDavid du Colombier n = BLEN(bp);
3649ef1f84bSDavid du Colombier if(len > n)
3659ef1f84bSDavid du Colombier memset(bp->wp, 0, len-n);
3669ef1f84bSDavid du Colombier bp->wp = bp->rp+len;
3679ef1f84bSDavid du Colombier QDEBUG checkb(bp, "adjustblock 2");
3689ef1f84bSDavid du Colombier
3699ef1f84bSDavid du Colombier return bp;
3709ef1f84bSDavid du Colombier }
3719ef1f84bSDavid du Colombier
3729ef1f84bSDavid du Colombier
3739ef1f84bSDavid du Colombier /*
3749ef1f84bSDavid du Colombier * throw away up to count bytes from a
3759ef1f84bSDavid du Colombier * list of blocks. Return count of bytes
3769ef1f84bSDavid du Colombier * thrown away.
3779ef1f84bSDavid du Colombier */
3789ef1f84bSDavid du Colombier int
pullblock(Block ** bph,int count)3799ef1f84bSDavid du Colombier pullblock(Block **bph, int count)
3809ef1f84bSDavid du Colombier {
3819ef1f84bSDavid du Colombier Block *bp;
3829ef1f84bSDavid du Colombier int n, bytes;
3839ef1f84bSDavid du Colombier
3849ef1f84bSDavid du Colombier bytes = 0;
3859ef1f84bSDavid du Colombier if(bph == nil)
3869ef1f84bSDavid du Colombier return 0;
3879ef1f84bSDavid du Colombier
3889ef1f84bSDavid du Colombier while(*bph != nil && count != 0) {
3899ef1f84bSDavid du Colombier bp = *bph;
3909ef1f84bSDavid du Colombier n = BLEN(bp);
3919ef1f84bSDavid du Colombier if(count < n)
3929ef1f84bSDavid du Colombier n = count;
3939ef1f84bSDavid du Colombier bytes += n;
3949ef1f84bSDavid du Colombier count -= n;
3959ef1f84bSDavid du Colombier bp->rp += n;
3969ef1f84bSDavid du Colombier QDEBUG checkb(bp, "pullblock ");
3979ef1f84bSDavid du Colombier if(BLEN(bp) == 0) {
3989ef1f84bSDavid du Colombier *bph = bp->next;
3999ef1f84bSDavid du Colombier bp->next = nil;
4009ef1f84bSDavid du Colombier freeb(bp);
4019ef1f84bSDavid du Colombier }
4029ef1f84bSDavid du Colombier }
4039ef1f84bSDavid du Colombier return bytes;
4049ef1f84bSDavid du Colombier }
4059ef1f84bSDavid du Colombier
4069ef1f84bSDavid du Colombier /*
4079ef1f84bSDavid du Colombier * get next block from a queue, return null if nothing there
4089ef1f84bSDavid du Colombier */
4099ef1f84bSDavid du Colombier Block*
qget(Queue * q)4109ef1f84bSDavid du Colombier qget(Queue *q)
4119ef1f84bSDavid du Colombier {
4129ef1f84bSDavid du Colombier int dowakeup;
4139ef1f84bSDavid du Colombier Block *b;
4149ef1f84bSDavid du Colombier
4159ef1f84bSDavid du Colombier /* sync with qwrite */
4169ef1f84bSDavid du Colombier ilock(q);
4179ef1f84bSDavid du Colombier
4189ef1f84bSDavid du Colombier b = q->bfirst;
4199ef1f84bSDavid du Colombier if(b == nil){
4209ef1f84bSDavid du Colombier q->state |= Qstarve;
4219ef1f84bSDavid du Colombier iunlock(q);
4229ef1f84bSDavid du Colombier return nil;
4239ef1f84bSDavid du Colombier }
4249ef1f84bSDavid du Colombier q->bfirst = b->next;
4259ef1f84bSDavid du Colombier b->next = 0;
4269ef1f84bSDavid du Colombier q->len -= BALLOC(b);
4279ef1f84bSDavid du Colombier q->dlen -= BLEN(b);
4289ef1f84bSDavid du Colombier QDEBUG checkb(b, "qget");
4299ef1f84bSDavid du Colombier
4309ef1f84bSDavid du Colombier /* if writer flow controlled, restart */
4319ef1f84bSDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
4329ef1f84bSDavid du Colombier q->state &= ~Qflow;
4339ef1f84bSDavid du Colombier dowakeup = 1;
4349ef1f84bSDavid du Colombier } else
4359ef1f84bSDavid du Colombier dowakeup = 0;
4369ef1f84bSDavid du Colombier
4379ef1f84bSDavid du Colombier iunlock(q);
4389ef1f84bSDavid du Colombier
4399ef1f84bSDavid du Colombier if(dowakeup)
4409ef1f84bSDavid du Colombier wakeup(&q->wr);
4419ef1f84bSDavid du Colombier
4429ef1f84bSDavid du Colombier return b;
4439ef1f84bSDavid du Colombier }
4449ef1f84bSDavid du Colombier
4459ef1f84bSDavid du Colombier /*
4469ef1f84bSDavid du Colombier * throw away the next 'len' bytes in the queue
4479ef1f84bSDavid du Colombier */
4489ef1f84bSDavid du Colombier int
qdiscard(Queue * q,int len)4499ef1f84bSDavid du Colombier qdiscard(Queue *q, int len)
4509ef1f84bSDavid du Colombier {
4519ef1f84bSDavid du Colombier Block *b;
4529ef1f84bSDavid du Colombier int dowakeup, n, sofar;
4539ef1f84bSDavid du Colombier
4549ef1f84bSDavid du Colombier ilock(q);
4559ef1f84bSDavid du Colombier for(sofar = 0; sofar < len; sofar += n){
4569ef1f84bSDavid du Colombier b = q->bfirst;
4579ef1f84bSDavid du Colombier if(b == nil)
4589ef1f84bSDavid du Colombier break;
4599ef1f84bSDavid du Colombier QDEBUG checkb(b, "qdiscard");
4609ef1f84bSDavid du Colombier n = BLEN(b);
4619ef1f84bSDavid du Colombier if(n <= len - sofar){
4629ef1f84bSDavid du Colombier q->bfirst = b->next;
4639ef1f84bSDavid du Colombier b->next = 0;
4649ef1f84bSDavid du Colombier q->len -= BALLOC(b);
4659ef1f84bSDavid du Colombier q->dlen -= BLEN(b);
4669ef1f84bSDavid du Colombier freeb(b);
4679ef1f84bSDavid du Colombier } else {
4689ef1f84bSDavid du Colombier n = len - sofar;
4699ef1f84bSDavid du Colombier b->rp += n;
4709ef1f84bSDavid du Colombier q->dlen -= n;
4719ef1f84bSDavid du Colombier }
4729ef1f84bSDavid du Colombier }
4739ef1f84bSDavid du Colombier
4749ef1f84bSDavid du Colombier /*
4759ef1f84bSDavid du Colombier * if writer flow controlled, restart
4769ef1f84bSDavid du Colombier *
4779ef1f84bSDavid du Colombier * This used to be
4789ef1f84bSDavid du Colombier * q->len < q->limit/2
4799ef1f84bSDavid du Colombier * but it slows down tcp too much for certain write sizes.
4809ef1f84bSDavid du Colombier * I really don't understand it completely. It may be
4819ef1f84bSDavid du Colombier * due to the queue draining so fast that the transmission
4829ef1f84bSDavid du Colombier * stalls waiting for the app to produce more data. - presotto
4839ef1f84bSDavid du Colombier *
4849ef1f84bSDavid du Colombier * changed back from q->len < q->limit for reno tcp. - jmk
4859ef1f84bSDavid du Colombier */
4869ef1f84bSDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
4879ef1f84bSDavid du Colombier q->state &= ~Qflow;
4889ef1f84bSDavid du Colombier dowakeup = 1;
4899ef1f84bSDavid du Colombier } else
4909ef1f84bSDavid du Colombier dowakeup = 0;
4919ef1f84bSDavid du Colombier
4929ef1f84bSDavid du Colombier iunlock(q);
4939ef1f84bSDavid du Colombier
4949ef1f84bSDavid du Colombier if(dowakeup)
4959ef1f84bSDavid du Colombier wakeup(&q->wr);
4969ef1f84bSDavid du Colombier
4979ef1f84bSDavid du Colombier return sofar;
4989ef1f84bSDavid du Colombier }
4999ef1f84bSDavid du Colombier
5009ef1f84bSDavid du Colombier /*
5019ef1f84bSDavid du Colombier * Interrupt level copy out of a queue, return # bytes copied.
5029ef1f84bSDavid du Colombier */
5039ef1f84bSDavid du Colombier int
qconsume(Queue * q,void * vp,int len)5049ef1f84bSDavid du Colombier qconsume(Queue *q, void *vp, int len)
5059ef1f84bSDavid du Colombier {
5069ef1f84bSDavid du Colombier Block *b;
5079ef1f84bSDavid du Colombier int n, dowakeup;
5089ef1f84bSDavid du Colombier uchar *p = vp;
5099ef1f84bSDavid du Colombier Block *tofree = nil;
5109ef1f84bSDavid du Colombier
5119ef1f84bSDavid du Colombier /* sync with qwrite */
5129ef1f84bSDavid du Colombier ilock(q);
5139ef1f84bSDavid du Colombier
5149ef1f84bSDavid du Colombier for(;;) {
5159ef1f84bSDavid du Colombier b = q->bfirst;
5169ef1f84bSDavid du Colombier if(b == 0){
5179ef1f84bSDavid du Colombier q->state |= Qstarve;
5189ef1f84bSDavid du Colombier iunlock(q);
5199ef1f84bSDavid du Colombier return -1;
5209ef1f84bSDavid du Colombier }
5219ef1f84bSDavid du Colombier QDEBUG checkb(b, "qconsume 1");
5229ef1f84bSDavid du Colombier
5239ef1f84bSDavid du Colombier n = BLEN(b);
5249ef1f84bSDavid du Colombier if(n > 0)
5259ef1f84bSDavid du Colombier break;
5269ef1f84bSDavid du Colombier q->bfirst = b->next;
5279ef1f84bSDavid du Colombier q->len -= BALLOC(b);
5289ef1f84bSDavid du Colombier
5299ef1f84bSDavid du Colombier /* remember to free this */
5309ef1f84bSDavid du Colombier b->next = tofree;
5319ef1f84bSDavid du Colombier tofree = b;
5329ef1f84bSDavid du Colombier };
5339ef1f84bSDavid du Colombier
5349ef1f84bSDavid du Colombier if(n < len)
5359ef1f84bSDavid du Colombier len = n;
5369ef1f84bSDavid du Colombier memmove(p, b->rp, len);
5379ef1f84bSDavid du Colombier consumecnt += n;
5389ef1f84bSDavid du Colombier b->rp += len;
5399ef1f84bSDavid du Colombier q->dlen -= len;
5409ef1f84bSDavid du Colombier
5419ef1f84bSDavid du Colombier /* discard the block if we're done with it */
5429ef1f84bSDavid du Colombier if((q->state & Qmsg) || len == n){
5439ef1f84bSDavid du Colombier q->bfirst = b->next;
5449ef1f84bSDavid du Colombier b->next = 0;
5459ef1f84bSDavid du Colombier q->len -= BALLOC(b);
5469ef1f84bSDavid du Colombier q->dlen -= BLEN(b);
5479ef1f84bSDavid du Colombier
5489ef1f84bSDavid du Colombier /* remember to free this */
5499ef1f84bSDavid du Colombier b->next = tofree;
5509ef1f84bSDavid du Colombier tofree = b;
5519ef1f84bSDavid du Colombier }
5529ef1f84bSDavid du Colombier
5539ef1f84bSDavid du Colombier /* if writer flow controlled, restart */
5549ef1f84bSDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
5559ef1f84bSDavid du Colombier q->state &= ~Qflow;
5569ef1f84bSDavid du Colombier dowakeup = 1;
5579ef1f84bSDavid du Colombier } else
5589ef1f84bSDavid du Colombier dowakeup = 0;
5599ef1f84bSDavid du Colombier
5609ef1f84bSDavid du Colombier iunlock(q);
5619ef1f84bSDavid du Colombier
5629ef1f84bSDavid du Colombier if(dowakeup)
5639ef1f84bSDavid du Colombier wakeup(&q->wr);
5649ef1f84bSDavid du Colombier
5659ef1f84bSDavid du Colombier if(tofree != nil)
5669ef1f84bSDavid du Colombier freeblist(tofree);
5679ef1f84bSDavid du Colombier
5689ef1f84bSDavid du Colombier return len;
5699ef1f84bSDavid du Colombier }
5709ef1f84bSDavid du Colombier
5719ef1f84bSDavid du Colombier int
qpass(Queue * q,Block * b)5729ef1f84bSDavid du Colombier qpass(Queue *q, Block *b)
5739ef1f84bSDavid du Colombier {
5749ef1f84bSDavid du Colombier int dlen, len, dowakeup;
5759ef1f84bSDavid du Colombier
5769ef1f84bSDavid du Colombier /* sync with qread */
5779ef1f84bSDavid du Colombier dowakeup = 0;
5789ef1f84bSDavid du Colombier ilock(q);
5799ef1f84bSDavid du Colombier if(q->len >= q->limit){
5809ef1f84bSDavid du Colombier iunlock(q);
581*bccccb4aSDavid du Colombier freeblist(b);
5829ef1f84bSDavid du Colombier return -1;
5839ef1f84bSDavid du Colombier }
5849ef1f84bSDavid du Colombier if(q->state & Qclosed){
5859ef1f84bSDavid du Colombier len = BALLOC(b);
5869ef1f84bSDavid du Colombier iunlock(q);
587*bccccb4aSDavid du Colombier freeblist(b);
5889ef1f84bSDavid du Colombier return len;
5899ef1f84bSDavid du Colombier }
5909ef1f84bSDavid du Colombier
5919ef1f84bSDavid du Colombier /* add buffer to queue */
5929ef1f84bSDavid du Colombier if(q->bfirst)
5939ef1f84bSDavid du Colombier q->blast->next = b;
5949ef1f84bSDavid du Colombier else
5959ef1f84bSDavid du Colombier q->bfirst = b;
5969ef1f84bSDavid du Colombier len = BALLOC(b);
5979ef1f84bSDavid du Colombier dlen = BLEN(b);
5989ef1f84bSDavid du Colombier QDEBUG checkb(b, "qpass");
5999ef1f84bSDavid du Colombier while(b->next){
6009ef1f84bSDavid du Colombier b = b->next;
6019ef1f84bSDavid du Colombier QDEBUG checkb(b, "qpass");
6029ef1f84bSDavid du Colombier len += BALLOC(b);
6039ef1f84bSDavid du Colombier dlen += BLEN(b);
6049ef1f84bSDavid du Colombier }
6059ef1f84bSDavid du Colombier q->blast = b;
6069ef1f84bSDavid du Colombier q->len += len;
6079ef1f84bSDavid du Colombier q->dlen += dlen;
6089ef1f84bSDavid du Colombier
6099ef1f84bSDavid du Colombier if(q->len >= q->limit/2)
6109ef1f84bSDavid du Colombier q->state |= Qflow;
6119ef1f84bSDavid du Colombier
6129ef1f84bSDavid du Colombier if(q->state & Qstarve){
6139ef1f84bSDavid du Colombier q->state &= ~Qstarve;
6149ef1f84bSDavid du Colombier dowakeup = 1;
6159ef1f84bSDavid du Colombier }
6169ef1f84bSDavid du Colombier iunlock(q);
6179ef1f84bSDavid du Colombier
6189ef1f84bSDavid du Colombier if(dowakeup)
6199ef1f84bSDavid du Colombier wakeup(&q->rr);
6209ef1f84bSDavid du Colombier
6219ef1f84bSDavid du Colombier return len;
6229ef1f84bSDavid du Colombier }
6239ef1f84bSDavid du Colombier
6249ef1f84bSDavid du Colombier int
qpassnolim(Queue * q,Block * b)6259ef1f84bSDavid du Colombier qpassnolim(Queue *q, Block *b)
6269ef1f84bSDavid du Colombier {
6279ef1f84bSDavid du Colombier int dlen, len, dowakeup;
6289ef1f84bSDavid du Colombier
6299ef1f84bSDavid du Colombier /* sync with qread */
6309ef1f84bSDavid du Colombier dowakeup = 0;
6319ef1f84bSDavid du Colombier ilock(q);
6329ef1f84bSDavid du Colombier
6339ef1f84bSDavid du Colombier if(q->state & Qclosed){
6349ef1f84bSDavid du Colombier len = BALLOC(b);
6359ef1f84bSDavid du Colombier iunlock(q);
636*bccccb4aSDavid du Colombier freeblist(b);
6379ef1f84bSDavid du Colombier return len;
6389ef1f84bSDavid du Colombier }
6399ef1f84bSDavid du Colombier
6409ef1f84bSDavid du Colombier /* add buffer to queue */
6419ef1f84bSDavid du Colombier if(q->bfirst)
6429ef1f84bSDavid du Colombier q->blast->next = b;
6439ef1f84bSDavid du Colombier else
6449ef1f84bSDavid du Colombier q->bfirst = b;
6459ef1f84bSDavid du Colombier len = BALLOC(b);
6469ef1f84bSDavid du Colombier dlen = BLEN(b);
6479ef1f84bSDavid du Colombier QDEBUG checkb(b, "qpass");
6489ef1f84bSDavid du Colombier while(b->next){
6499ef1f84bSDavid du Colombier b = b->next;
6509ef1f84bSDavid du Colombier QDEBUG checkb(b, "qpass");
6519ef1f84bSDavid du Colombier len += BALLOC(b);
6529ef1f84bSDavid du Colombier dlen += BLEN(b);
6539ef1f84bSDavid du Colombier }
6549ef1f84bSDavid du Colombier q->blast = b;
6559ef1f84bSDavid du Colombier q->len += len;
6569ef1f84bSDavid du Colombier q->dlen += dlen;
6579ef1f84bSDavid du Colombier
6589ef1f84bSDavid du Colombier if(q->len >= q->limit/2)
6599ef1f84bSDavid du Colombier q->state |= Qflow;
6609ef1f84bSDavid du Colombier
6619ef1f84bSDavid du Colombier if(q->state & Qstarve){
6629ef1f84bSDavid du Colombier q->state &= ~Qstarve;
6639ef1f84bSDavid du Colombier dowakeup = 1;
6649ef1f84bSDavid du Colombier }
6659ef1f84bSDavid du Colombier iunlock(q);
6669ef1f84bSDavid du Colombier
6679ef1f84bSDavid du Colombier if(dowakeup)
6689ef1f84bSDavid du Colombier wakeup(&q->rr);
6699ef1f84bSDavid du Colombier
6709ef1f84bSDavid du Colombier return len;
6719ef1f84bSDavid du Colombier }
6729ef1f84bSDavid du Colombier
6739ef1f84bSDavid du Colombier /*
6749ef1f84bSDavid du Colombier * if the allocated space is way out of line with the used
6759ef1f84bSDavid du Colombier * space, reallocate to a smaller block
6769ef1f84bSDavid du Colombier */
6779ef1f84bSDavid du Colombier Block*
packblock(Block * bp)6789ef1f84bSDavid du Colombier packblock(Block *bp)
6799ef1f84bSDavid du Colombier {
6809ef1f84bSDavid du Colombier Block **l, *nbp;
6819ef1f84bSDavid du Colombier int n;
6829ef1f84bSDavid du Colombier
6839ef1f84bSDavid du Colombier for(l = &bp; *l; l = &(*l)->next){
6849ef1f84bSDavid du Colombier nbp = *l;
6859ef1f84bSDavid du Colombier n = BLEN(nbp);
6869ef1f84bSDavid du Colombier if((n<<2) < BALLOC(nbp)){
6879ef1f84bSDavid du Colombier *l = allocb(n);
6889ef1f84bSDavid du Colombier memmove((*l)->wp, nbp->rp, n);
6899ef1f84bSDavid du Colombier (*l)->wp += n;
6909ef1f84bSDavid du Colombier (*l)->next = nbp->next;
6919ef1f84bSDavid du Colombier freeb(nbp);
6929ef1f84bSDavid du Colombier }
6939ef1f84bSDavid du Colombier }
6949ef1f84bSDavid du Colombier
6959ef1f84bSDavid du Colombier return bp;
6969ef1f84bSDavid du Colombier }
6979ef1f84bSDavid du Colombier
6989ef1f84bSDavid du Colombier int
qproduce(Queue * q,void * vp,int len)6999ef1f84bSDavid du Colombier qproduce(Queue *q, void *vp, int len)
7009ef1f84bSDavid du Colombier {
7019ef1f84bSDavid du Colombier Block *b;
7029ef1f84bSDavid du Colombier int dowakeup;
7039ef1f84bSDavid du Colombier uchar *p = vp;
7049ef1f84bSDavid du Colombier
7059ef1f84bSDavid du Colombier /* sync with qread */
7069ef1f84bSDavid du Colombier dowakeup = 0;
7079ef1f84bSDavid du Colombier ilock(q);
7089ef1f84bSDavid du Colombier
7099ef1f84bSDavid du Colombier /* no waiting receivers, room in buffer? */
7109ef1f84bSDavid du Colombier if(q->len >= q->limit){
7119ef1f84bSDavid du Colombier q->state |= Qflow;
7129ef1f84bSDavid du Colombier iunlock(q);
7139ef1f84bSDavid du Colombier return -1;
7149ef1f84bSDavid du Colombier }
7159ef1f84bSDavid du Colombier
7169ef1f84bSDavid du Colombier /* save in buffer */
7179ef1f84bSDavid du Colombier b = iallocb(len);
7189ef1f84bSDavid du Colombier if(b == 0){
7199ef1f84bSDavid du Colombier iunlock(q);
7209ef1f84bSDavid du Colombier return 0;
7219ef1f84bSDavid du Colombier }
7229ef1f84bSDavid du Colombier memmove(b->wp, p, len);
7239ef1f84bSDavid du Colombier producecnt += len;
7249ef1f84bSDavid du Colombier b->wp += len;
7259ef1f84bSDavid du Colombier if(q->bfirst)
7269ef1f84bSDavid du Colombier q->blast->next = b;
7279ef1f84bSDavid du Colombier else
7289ef1f84bSDavid du Colombier q->bfirst = b;
7299ef1f84bSDavid du Colombier q->blast = b;
7309ef1f84bSDavid du Colombier /* b->next = 0; done by iallocb() */
7319ef1f84bSDavid du Colombier q->len += BALLOC(b);
7329ef1f84bSDavid du Colombier q->dlen += BLEN(b);
7339ef1f84bSDavid du Colombier QDEBUG checkb(b, "qproduce");
7349ef1f84bSDavid du Colombier
7359ef1f84bSDavid du Colombier if(q->state & Qstarve){
7369ef1f84bSDavid du Colombier q->state &= ~Qstarve;
7379ef1f84bSDavid du Colombier dowakeup = 1;
7389ef1f84bSDavid du Colombier }
7399ef1f84bSDavid du Colombier
7409ef1f84bSDavid du Colombier if(q->len >= q->limit)
7419ef1f84bSDavid du Colombier q->state |= Qflow;
7429ef1f84bSDavid du Colombier iunlock(q);
7439ef1f84bSDavid du Colombier
7449ef1f84bSDavid du Colombier if(dowakeup)
7459ef1f84bSDavid du Colombier wakeup(&q->rr);
7469ef1f84bSDavid du Colombier
7479ef1f84bSDavid du Colombier return len;
7489ef1f84bSDavid du Colombier }
7499ef1f84bSDavid du Colombier
7509ef1f84bSDavid du Colombier /*
7519ef1f84bSDavid du Colombier * copy from offset in the queue
7529ef1f84bSDavid du Colombier */
7539ef1f84bSDavid du Colombier Block*
qcopy(Queue * q,int len,ulong offset)7549ef1f84bSDavid du Colombier qcopy(Queue *q, int len, ulong offset)
7559ef1f84bSDavid du Colombier {
7569ef1f84bSDavid du Colombier int sofar;
7579ef1f84bSDavid du Colombier int n;
7589ef1f84bSDavid du Colombier Block *b, *nb;
7599ef1f84bSDavid du Colombier uchar *p;
7609ef1f84bSDavid du Colombier
7619ef1f84bSDavid du Colombier nb = allocb(len);
7629ef1f84bSDavid du Colombier
7639ef1f84bSDavid du Colombier ilock(q);
7649ef1f84bSDavid du Colombier
7659ef1f84bSDavid du Colombier /* go to offset */
7669ef1f84bSDavid du Colombier b = q->bfirst;
7679ef1f84bSDavid du Colombier for(sofar = 0; ; sofar += n){
7689ef1f84bSDavid du Colombier if(b == nil){
7699ef1f84bSDavid du Colombier iunlock(q);
7709ef1f84bSDavid du Colombier return nb;
7719ef1f84bSDavid du Colombier }
7729ef1f84bSDavid du Colombier n = BLEN(b);
7739ef1f84bSDavid du Colombier if(sofar + n > offset){
7749ef1f84bSDavid du Colombier p = b->rp + offset - sofar;
7759ef1f84bSDavid du Colombier n -= offset - sofar;
7769ef1f84bSDavid du Colombier break;
7779ef1f84bSDavid du Colombier }
7789ef1f84bSDavid du Colombier QDEBUG checkb(b, "qcopy");
7799ef1f84bSDavid du Colombier b = b->next;
7809ef1f84bSDavid du Colombier }
7819ef1f84bSDavid du Colombier
7829ef1f84bSDavid du Colombier /* copy bytes from there */
7839ef1f84bSDavid du Colombier for(sofar = 0; sofar < len;){
7849ef1f84bSDavid du Colombier if(n > len - sofar)
7859ef1f84bSDavid du Colombier n = len - sofar;
7869ef1f84bSDavid du Colombier memmove(nb->wp, p, n);
7879ef1f84bSDavid du Colombier qcopycnt += n;
7889ef1f84bSDavid du Colombier sofar += n;
7899ef1f84bSDavid du Colombier nb->wp += n;
7909ef1f84bSDavid du Colombier b = b->next;
7919ef1f84bSDavid du Colombier if(b == nil)
7929ef1f84bSDavid du Colombier break;
7939ef1f84bSDavid du Colombier n = BLEN(b);
7949ef1f84bSDavid du Colombier p = b->rp;
7959ef1f84bSDavid du Colombier }
7969ef1f84bSDavid du Colombier iunlock(q);
7979ef1f84bSDavid du Colombier
7989ef1f84bSDavid du Colombier return nb;
7999ef1f84bSDavid du Colombier }
8009ef1f84bSDavid du Colombier
8019ef1f84bSDavid du Colombier /*
8029ef1f84bSDavid du Colombier * called by non-interrupt code
8039ef1f84bSDavid du Colombier */
8049ef1f84bSDavid du Colombier Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)8059ef1f84bSDavid du Colombier qopen(int limit, int msg, void (*kick)(void*), void *arg)
8069ef1f84bSDavid du Colombier {
8079ef1f84bSDavid du Colombier Queue *q;
8089ef1f84bSDavid du Colombier
8099ef1f84bSDavid du Colombier q = malloc(sizeof(Queue));
8109ef1f84bSDavid du Colombier if(q == 0)
8119ef1f84bSDavid du Colombier return 0;
8129ef1f84bSDavid du Colombier
8139ef1f84bSDavid du Colombier q->limit = q->inilim = limit;
8149ef1f84bSDavid du Colombier q->kick = kick;
8159ef1f84bSDavid du Colombier q->arg = arg;
8169ef1f84bSDavid du Colombier q->state = msg;
8179ef1f84bSDavid du Colombier
8189ef1f84bSDavid du Colombier q->state |= Qstarve;
8199ef1f84bSDavid du Colombier q->eof = 0;
8209ef1f84bSDavid du Colombier q->noblock = 0;
8219ef1f84bSDavid du Colombier
8229ef1f84bSDavid du Colombier return q;
8239ef1f84bSDavid du Colombier }
8249ef1f84bSDavid du Colombier
8259ef1f84bSDavid du Colombier /* open a queue to be bypassed */
8269ef1f84bSDavid du Colombier Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)8279ef1f84bSDavid du Colombier qbypass(void (*bypass)(void*, Block*), void *arg)
8289ef1f84bSDavid du Colombier {
8299ef1f84bSDavid du Colombier Queue *q;
8309ef1f84bSDavid du Colombier
8319ef1f84bSDavid du Colombier q = malloc(sizeof(Queue));
8329ef1f84bSDavid du Colombier if(q == 0)
8339ef1f84bSDavid du Colombier return 0;
8349ef1f84bSDavid du Colombier
8359ef1f84bSDavid du Colombier q->limit = 0;
8369ef1f84bSDavid du Colombier q->arg = arg;
8379ef1f84bSDavid du Colombier q->bypass = bypass;
8389ef1f84bSDavid du Colombier q->state = 0;
8399ef1f84bSDavid du Colombier
8409ef1f84bSDavid du Colombier return q;
8419ef1f84bSDavid du Colombier }
8429ef1f84bSDavid du Colombier
8439ef1f84bSDavid du Colombier static int
notempty(void * a)8449ef1f84bSDavid du Colombier notempty(void *a)
8459ef1f84bSDavid du Colombier {
8469ef1f84bSDavid du Colombier Queue *q = a;
8479ef1f84bSDavid du Colombier
8489ef1f84bSDavid du Colombier return (q->state & Qclosed) || q->bfirst != 0;
8499ef1f84bSDavid du Colombier }
8509ef1f84bSDavid du Colombier
8519ef1f84bSDavid du Colombier /*
8529ef1f84bSDavid du Colombier * wait for the queue to be non-empty or closed.
8539ef1f84bSDavid du Colombier * called with q ilocked.
8549ef1f84bSDavid du Colombier */
8559ef1f84bSDavid du Colombier static int
qwait(Queue * q)8569ef1f84bSDavid du Colombier qwait(Queue *q)
8579ef1f84bSDavid du Colombier {
8589ef1f84bSDavid du Colombier /* wait for data */
8599ef1f84bSDavid du Colombier for(;;){
8609ef1f84bSDavid du Colombier if(q->bfirst != nil)
8619ef1f84bSDavid du Colombier break;
8629ef1f84bSDavid du Colombier
8639ef1f84bSDavid du Colombier if(q->state & Qclosed){
8649ef1f84bSDavid du Colombier if(++q->eof > 3)
8659ef1f84bSDavid du Colombier return -1;
8669ef1f84bSDavid du Colombier if(*q->err && strcmp(q->err, Ehungup) != 0)
8679ef1f84bSDavid du Colombier return -1;
8689ef1f84bSDavid du Colombier return 0;
8699ef1f84bSDavid du Colombier }
8709ef1f84bSDavid du Colombier
8719ef1f84bSDavid du Colombier q->state |= Qstarve; /* flag requesting producer to wake me */
8729ef1f84bSDavid du Colombier iunlock(q);
8739ef1f84bSDavid du Colombier sleep(&q->rr, notempty, q);
8749ef1f84bSDavid du Colombier ilock(q);
8759ef1f84bSDavid du Colombier }
8769ef1f84bSDavid du Colombier return 1;
8779ef1f84bSDavid du Colombier }
8789ef1f84bSDavid du Colombier
8799ef1f84bSDavid du Colombier /*
8809ef1f84bSDavid du Colombier * add a block list to a queue
8819ef1f84bSDavid du Colombier */
8829ef1f84bSDavid du Colombier void
qaddlist(Queue * q,Block * b)8839ef1f84bSDavid du Colombier qaddlist(Queue *q, Block *b)
8849ef1f84bSDavid du Colombier {
8859ef1f84bSDavid du Colombier /* queue the block */
8869ef1f84bSDavid du Colombier if(q->bfirst)
8879ef1f84bSDavid du Colombier q->blast->next = b;
8889ef1f84bSDavid du Colombier else
8899ef1f84bSDavid du Colombier q->bfirst = b;
8909ef1f84bSDavid du Colombier q->len += blockalloclen(b);
8919ef1f84bSDavid du Colombier q->dlen += blocklen(b);
8929ef1f84bSDavid du Colombier while(b->next)
8939ef1f84bSDavid du Colombier b = b->next;
8949ef1f84bSDavid du Colombier q->blast = b;
8959ef1f84bSDavid du Colombier }
8969ef1f84bSDavid du Colombier
8979ef1f84bSDavid du Colombier /*
8989ef1f84bSDavid du Colombier * called with q ilocked
8999ef1f84bSDavid du Colombier */
9009ef1f84bSDavid du Colombier Block*
qremove(Queue * q)9019ef1f84bSDavid du Colombier qremove(Queue *q)
9029ef1f84bSDavid du Colombier {
9039ef1f84bSDavid du Colombier Block *b;
9049ef1f84bSDavid du Colombier
9059ef1f84bSDavid du Colombier b = q->bfirst;
9069ef1f84bSDavid du Colombier if(b == nil)
9079ef1f84bSDavid du Colombier return nil;
9089ef1f84bSDavid du Colombier q->bfirst = b->next;
9099ef1f84bSDavid du Colombier b->next = nil;
9109ef1f84bSDavid du Colombier q->dlen -= BLEN(b);
9119ef1f84bSDavid du Colombier q->len -= BALLOC(b);
9129ef1f84bSDavid du Colombier QDEBUG checkb(b, "qremove");
9139ef1f84bSDavid du Colombier return b;
9149ef1f84bSDavid du Colombier }
9159ef1f84bSDavid du Colombier
9169ef1f84bSDavid du Colombier /*
9179ef1f84bSDavid du Colombier * copy the contents of a string of blocks into
9189ef1f84bSDavid du Colombier * memory. emptied blocks are freed. return
9199ef1f84bSDavid du Colombier * pointer to first unconsumed block.
9209ef1f84bSDavid du Colombier */
9219ef1f84bSDavid du Colombier Block*
bl2mem(uchar * p,Block * b,int n)9229ef1f84bSDavid du Colombier bl2mem(uchar *p, Block *b, int n)
9239ef1f84bSDavid du Colombier {
9249ef1f84bSDavid du Colombier int i;
9259ef1f84bSDavid du Colombier Block *next;
9269ef1f84bSDavid du Colombier
9279ef1f84bSDavid du Colombier for(; b != nil; b = next){
9289ef1f84bSDavid du Colombier i = BLEN(b);
9299ef1f84bSDavid du Colombier if(i > n){
9309ef1f84bSDavid du Colombier memmove(p, b->rp, n);
9319ef1f84bSDavid du Colombier b->rp += n;
9329ef1f84bSDavid du Colombier return b;
9339ef1f84bSDavid du Colombier }
9349ef1f84bSDavid du Colombier memmove(p, b->rp, i);
9359ef1f84bSDavid du Colombier n -= i;
9369ef1f84bSDavid du Colombier p += i;
9379ef1f84bSDavid du Colombier b->rp += i;
9389ef1f84bSDavid du Colombier next = b->next;
9399ef1f84bSDavid du Colombier freeb(b);
9409ef1f84bSDavid du Colombier }
9419ef1f84bSDavid du Colombier return nil;
9429ef1f84bSDavid du Colombier }
9439ef1f84bSDavid du Colombier
9449ef1f84bSDavid du Colombier /*
9459ef1f84bSDavid du Colombier * copy the contents of memory into a string of blocks.
9469ef1f84bSDavid du Colombier * return nil on error.
9479ef1f84bSDavid du Colombier */
9489ef1f84bSDavid du Colombier Block*
mem2bl(uchar * p,int len)9499ef1f84bSDavid du Colombier mem2bl(uchar *p, int len)
9509ef1f84bSDavid du Colombier {
9519ef1f84bSDavid du Colombier int n;
9529ef1f84bSDavid du Colombier Block *b, *first, **l;
9539ef1f84bSDavid du Colombier
9549ef1f84bSDavid du Colombier first = nil;
9559ef1f84bSDavid du Colombier l = &first;
9569ef1f84bSDavid du Colombier if(waserror()){
9579ef1f84bSDavid du Colombier freeblist(first);
9589ef1f84bSDavid du Colombier nexterror();
9599ef1f84bSDavid du Colombier }
9609ef1f84bSDavid du Colombier do {
9619ef1f84bSDavid du Colombier n = len;
9629ef1f84bSDavid du Colombier if(n > Maxatomic)
9639ef1f84bSDavid du Colombier n = Maxatomic;
9649ef1f84bSDavid du Colombier
9659ef1f84bSDavid du Colombier *l = b = allocb(n);
9669ef1f84bSDavid du Colombier setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
9679ef1f84bSDavid du Colombier memmove(b->wp, p, n);
9689ef1f84bSDavid du Colombier b->wp += n;
9699ef1f84bSDavid du Colombier p += n;
9709ef1f84bSDavid du Colombier len -= n;
9719ef1f84bSDavid du Colombier l = &b->next;
9729ef1f84bSDavid du Colombier } while(len > 0);
9739ef1f84bSDavid du Colombier poperror();
9749ef1f84bSDavid du Colombier
9759ef1f84bSDavid du Colombier return first;
9769ef1f84bSDavid du Colombier }
9779ef1f84bSDavid du Colombier
9789ef1f84bSDavid du Colombier /*
9799ef1f84bSDavid du Colombier * put a block back to the front of the queue
9809ef1f84bSDavid du Colombier * called with q ilocked
9819ef1f84bSDavid du Colombier */
9829ef1f84bSDavid du Colombier void
qputback(Queue * q,Block * b)9839ef1f84bSDavid du Colombier qputback(Queue *q, Block *b)
9849ef1f84bSDavid du Colombier {
9859ef1f84bSDavid du Colombier b->next = q->bfirst;
9869ef1f84bSDavid du Colombier if(q->bfirst == nil)
9879ef1f84bSDavid du Colombier q->blast = b;
9889ef1f84bSDavid du Colombier q->bfirst = b;
9899ef1f84bSDavid du Colombier q->len += BALLOC(b);
9909ef1f84bSDavid du Colombier q->dlen += BLEN(b);
9919ef1f84bSDavid du Colombier }
9929ef1f84bSDavid du Colombier
9939ef1f84bSDavid du Colombier /*
9949ef1f84bSDavid du Colombier * flow control, get producer going again
9959ef1f84bSDavid du Colombier * called with q ilocked
9969ef1f84bSDavid du Colombier */
9979ef1f84bSDavid du Colombier static void
qwakeup_iunlock(Queue * q)9989ef1f84bSDavid du Colombier qwakeup_iunlock(Queue *q)
9999ef1f84bSDavid du Colombier {
10009ef1f84bSDavid du Colombier int dowakeup;
10019ef1f84bSDavid du Colombier
10029ef1f84bSDavid du Colombier /* if writer flow controlled, restart */
10039ef1f84bSDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
10049ef1f84bSDavid du Colombier q->state &= ~Qflow;
10059ef1f84bSDavid du Colombier dowakeup = 1;
10069ef1f84bSDavid du Colombier }
10079ef1f84bSDavid du Colombier else
10089ef1f84bSDavid du Colombier dowakeup = 0;
10099ef1f84bSDavid du Colombier
10109ef1f84bSDavid du Colombier iunlock(q);
10119ef1f84bSDavid du Colombier
10129ef1f84bSDavid du Colombier /* wakeup flow controlled writers */
10139ef1f84bSDavid du Colombier if(dowakeup){
10149ef1f84bSDavid du Colombier if(q->kick)
10159ef1f84bSDavid du Colombier q->kick(q->arg);
10169ef1f84bSDavid du Colombier wakeup(&q->wr);
10179ef1f84bSDavid du Colombier }
10189ef1f84bSDavid du Colombier }
10199ef1f84bSDavid du Colombier
10209ef1f84bSDavid du Colombier /*
10219ef1f84bSDavid du Colombier * get next block from a queue (up to a limit)
10229ef1f84bSDavid du Colombier */
10239ef1f84bSDavid du Colombier Block*
qbread(Queue * q,int len)10249ef1f84bSDavid du Colombier qbread(Queue *q, int len)
10259ef1f84bSDavid du Colombier {
10269ef1f84bSDavid du Colombier Block *b, *nb;
10279ef1f84bSDavid du Colombier int n;
10289ef1f84bSDavid du Colombier
10299ef1f84bSDavid du Colombier qlock(&q->rlock);
10309ef1f84bSDavid du Colombier if(waserror()){
10319ef1f84bSDavid du Colombier qunlock(&q->rlock);
10329ef1f84bSDavid du Colombier nexterror();
10339ef1f84bSDavid du Colombier }
10349ef1f84bSDavid du Colombier
10359ef1f84bSDavid du Colombier ilock(q);
10369ef1f84bSDavid du Colombier switch(qwait(q)){
10379ef1f84bSDavid du Colombier case 0:
10389ef1f84bSDavid du Colombier /* queue closed */
10399ef1f84bSDavid du Colombier iunlock(q);
10409ef1f84bSDavid du Colombier qunlock(&q->rlock);
10419ef1f84bSDavid du Colombier poperror();
10429ef1f84bSDavid du Colombier return nil;
10439ef1f84bSDavid du Colombier case -1:
10449ef1f84bSDavid du Colombier /* multiple reads on a closed queue */
10459ef1f84bSDavid du Colombier iunlock(q);
10469ef1f84bSDavid du Colombier error(q->err);
10479ef1f84bSDavid du Colombier }
10489ef1f84bSDavid du Colombier
10499ef1f84bSDavid du Colombier /* if we get here, there's at least one block in the queue */
10509ef1f84bSDavid du Colombier b = qremove(q);
10519ef1f84bSDavid du Colombier n = BLEN(b);
10529ef1f84bSDavid du Colombier
10539ef1f84bSDavid du Colombier /* split block if it's too big and this is not a message queue */
10549ef1f84bSDavid du Colombier nb = b;
10559ef1f84bSDavid du Colombier if(n > len){
10569ef1f84bSDavid du Colombier if((q->state&Qmsg) == 0){
10579ef1f84bSDavid du Colombier n -= len;
10589ef1f84bSDavid du Colombier b = allocb(n);
10599ef1f84bSDavid du Colombier memmove(b->wp, nb->rp+len, n);
10609ef1f84bSDavid du Colombier b->wp += n;
10619ef1f84bSDavid du Colombier qputback(q, b);
10629ef1f84bSDavid du Colombier }
10639ef1f84bSDavid du Colombier nb->wp = nb->rp + len;
10649ef1f84bSDavid du Colombier }
10659ef1f84bSDavid du Colombier
10669ef1f84bSDavid du Colombier /* restart producer */
10679ef1f84bSDavid du Colombier qwakeup_iunlock(q);
10689ef1f84bSDavid du Colombier
10699ef1f84bSDavid du Colombier poperror();
10709ef1f84bSDavid du Colombier qunlock(&q->rlock);
10719ef1f84bSDavid du Colombier return nb;
10729ef1f84bSDavid du Colombier }
10739ef1f84bSDavid du Colombier
10749ef1f84bSDavid du Colombier /*
10759ef1f84bSDavid du Colombier * read a queue. if no data is queued, post a Block
10769ef1f84bSDavid du Colombier * and wait on its Rendez.
10779ef1f84bSDavid du Colombier */
10789ef1f84bSDavid du Colombier long
qread(Queue * q,void * vp,int len)10799ef1f84bSDavid du Colombier qread(Queue *q, void *vp, int len)
10809ef1f84bSDavid du Colombier {
10819ef1f84bSDavid du Colombier Block *b, *first, **l;
10829ef1f84bSDavid du Colombier int blen, n;
10839ef1f84bSDavid du Colombier
10849ef1f84bSDavid du Colombier qlock(&q->rlock);
10859ef1f84bSDavid du Colombier if(waserror()){
10869ef1f84bSDavid du Colombier qunlock(&q->rlock);
10879ef1f84bSDavid du Colombier nexterror();
10889ef1f84bSDavid du Colombier }
10899ef1f84bSDavid du Colombier
10909ef1f84bSDavid du Colombier ilock(q);
10919ef1f84bSDavid du Colombier again:
10929ef1f84bSDavid du Colombier switch(qwait(q)){
10939ef1f84bSDavid du Colombier case 0:
10949ef1f84bSDavid du Colombier /* queue closed */
10959ef1f84bSDavid du Colombier iunlock(q);
10969ef1f84bSDavid du Colombier qunlock(&q->rlock);
10979ef1f84bSDavid du Colombier poperror();
10989ef1f84bSDavid du Colombier return 0;
10999ef1f84bSDavid du Colombier case -1:
11009ef1f84bSDavid du Colombier /* multiple reads on a closed queue */
11019ef1f84bSDavid du Colombier iunlock(q);
11029ef1f84bSDavid du Colombier error(q->err);
11039ef1f84bSDavid du Colombier }
11049ef1f84bSDavid du Colombier
11059ef1f84bSDavid du Colombier /* if we get here, there's at least one block in the queue */
11069ef1f84bSDavid du Colombier if(q->state & Qcoalesce){
11079ef1f84bSDavid du Colombier /* when coalescing, 0 length blocks just go away */
11089ef1f84bSDavid du Colombier b = q->bfirst;
11099ef1f84bSDavid du Colombier if(BLEN(b) <= 0){
11109ef1f84bSDavid du Colombier freeb(qremove(q));
11119ef1f84bSDavid du Colombier goto again;
11129ef1f84bSDavid du Colombier }
11139ef1f84bSDavid du Colombier
11149ef1f84bSDavid du Colombier /* grab the first block plus as many
11159ef1f84bSDavid du Colombier * following blocks as will completely
11169ef1f84bSDavid du Colombier * fit in the read.
11179ef1f84bSDavid du Colombier */
11189ef1f84bSDavid du Colombier n = 0;
11199ef1f84bSDavid du Colombier l = &first;
11209ef1f84bSDavid du Colombier blen = BLEN(b);
11219ef1f84bSDavid du Colombier for(;;) {
11229ef1f84bSDavid du Colombier *l = qremove(q);
11239ef1f84bSDavid du Colombier l = &b->next;
11249ef1f84bSDavid du Colombier n += blen;
11259ef1f84bSDavid du Colombier
11269ef1f84bSDavid du Colombier b = q->bfirst;
11279ef1f84bSDavid du Colombier if(b == nil)
11289ef1f84bSDavid du Colombier break;
11299ef1f84bSDavid du Colombier blen = BLEN(b);
11309ef1f84bSDavid du Colombier if(n+blen > len)
11319ef1f84bSDavid du Colombier break;
11329ef1f84bSDavid du Colombier }
11339ef1f84bSDavid du Colombier } else {
11349ef1f84bSDavid du Colombier first = qremove(q);
11359ef1f84bSDavid du Colombier n = BLEN(first);
11369ef1f84bSDavid du Colombier }
11379ef1f84bSDavid du Colombier
11389ef1f84bSDavid du Colombier /* copy to user space outside of the ilock */
11399ef1f84bSDavid du Colombier iunlock(q);
11409ef1f84bSDavid du Colombier b = bl2mem(vp, first, len);
11419ef1f84bSDavid du Colombier ilock(q);
11429ef1f84bSDavid du Colombier
11439ef1f84bSDavid du Colombier /* take care of any left over partial block */
11449ef1f84bSDavid du Colombier if(b != nil){
11459ef1f84bSDavid du Colombier n -= BLEN(b);
11469ef1f84bSDavid du Colombier if(q->state & Qmsg)
11479ef1f84bSDavid du Colombier freeb(b);
11489ef1f84bSDavid du Colombier else
11499ef1f84bSDavid du Colombier qputback(q, b);
11509ef1f84bSDavid du Colombier }
11519ef1f84bSDavid du Colombier
11529ef1f84bSDavid du Colombier /* restart producer */
11539ef1f84bSDavid du Colombier qwakeup_iunlock(q);
11549ef1f84bSDavid du Colombier
11559ef1f84bSDavid du Colombier poperror();
11569ef1f84bSDavid du Colombier qunlock(&q->rlock);
11579ef1f84bSDavid du Colombier return n;
11589ef1f84bSDavid du Colombier }
11599ef1f84bSDavid du Colombier
11609ef1f84bSDavid du Colombier static int
qnotfull(void * a)11619ef1f84bSDavid du Colombier qnotfull(void *a)
11629ef1f84bSDavid du Colombier {
11639ef1f84bSDavid du Colombier Queue *q = a;
11649ef1f84bSDavid du Colombier
11659ef1f84bSDavid du Colombier return q->len < q->limit || (q->state & Qclosed);
11669ef1f84bSDavid du Colombier }
11679ef1f84bSDavid du Colombier
11689ef1f84bSDavid du Colombier ulong noblockcnt;
11699ef1f84bSDavid du Colombier
11709ef1f84bSDavid du Colombier /*
11719ef1f84bSDavid du Colombier * add a block to a queue obeying flow control
11729ef1f84bSDavid du Colombier */
11739ef1f84bSDavid du Colombier long
qbwrite(Queue * q,Block * b)11749ef1f84bSDavid du Colombier qbwrite(Queue *q, Block *b)
11759ef1f84bSDavid du Colombier {
11769ef1f84bSDavid du Colombier int n, dowakeup;
11779ef1f84bSDavid du Colombier Proc *p;
11789ef1f84bSDavid du Colombier
11799ef1f84bSDavid du Colombier n = BLEN(b);
11809ef1f84bSDavid du Colombier
11819ef1f84bSDavid du Colombier if(q->bypass){
11829ef1f84bSDavid du Colombier (*q->bypass)(q->arg, b);
11839ef1f84bSDavid du Colombier return n;
11849ef1f84bSDavid du Colombier }
11859ef1f84bSDavid du Colombier
11869ef1f84bSDavid du Colombier dowakeup = 0;
11879ef1f84bSDavid du Colombier qlock(&q->wlock);
11889ef1f84bSDavid du Colombier if(waserror()){
11899ef1f84bSDavid du Colombier if(b != nil)
11909ef1f84bSDavid du Colombier freeb(b);
11919ef1f84bSDavid du Colombier qunlock(&q->wlock);
11929ef1f84bSDavid du Colombier nexterror();
11939ef1f84bSDavid du Colombier }
11949ef1f84bSDavid du Colombier
11959ef1f84bSDavid du Colombier ilock(q);
11969ef1f84bSDavid du Colombier
11979ef1f84bSDavid du Colombier /* give up if the queue is closed */
11989ef1f84bSDavid du Colombier if(q->state & Qclosed){
11999ef1f84bSDavid du Colombier iunlock(q);
12009ef1f84bSDavid du Colombier error(q->err);
12019ef1f84bSDavid du Colombier }
12029ef1f84bSDavid du Colombier
12039ef1f84bSDavid du Colombier /* if nonblocking, don't queue over the limit */
12049ef1f84bSDavid du Colombier if(q->len >= q->limit){
12059ef1f84bSDavid du Colombier if(q->noblock){
12069ef1f84bSDavid du Colombier iunlock(q);
12079ef1f84bSDavid du Colombier freeb(b);
12089ef1f84bSDavid du Colombier noblockcnt += n;
12099ef1f84bSDavid du Colombier qunlock(&q->wlock);
12109ef1f84bSDavid du Colombier poperror();
12119ef1f84bSDavid du Colombier return n;
12129ef1f84bSDavid du Colombier }
12139ef1f84bSDavid du Colombier }
12149ef1f84bSDavid du Colombier
12159ef1f84bSDavid du Colombier /* queue the block */
12169ef1f84bSDavid du Colombier if(q->bfirst)
12179ef1f84bSDavid du Colombier q->blast->next = b;
12189ef1f84bSDavid du Colombier else
12199ef1f84bSDavid du Colombier q->bfirst = b;
12209ef1f84bSDavid du Colombier q->blast = b;
12219ef1f84bSDavid du Colombier b->next = 0;
12229ef1f84bSDavid du Colombier q->len += BALLOC(b);
12239ef1f84bSDavid du Colombier q->dlen += n;
12249ef1f84bSDavid du Colombier QDEBUG checkb(b, "qbwrite");
12259ef1f84bSDavid du Colombier b = nil;
12269ef1f84bSDavid du Colombier
12279ef1f84bSDavid du Colombier /* make sure other end gets awakened */
12289ef1f84bSDavid du Colombier if(q->state & Qstarve){
12299ef1f84bSDavid du Colombier q->state &= ~Qstarve;
12309ef1f84bSDavid du Colombier dowakeup = 1;
12319ef1f84bSDavid du Colombier }
12329ef1f84bSDavid du Colombier iunlock(q);
12339ef1f84bSDavid du Colombier
12349ef1f84bSDavid du Colombier /* get output going again */
12359ef1f84bSDavid du Colombier if(q->kick && (dowakeup || (q->state&Qkick)))
12369ef1f84bSDavid du Colombier q->kick(q->arg);
12379ef1f84bSDavid du Colombier
12389ef1f84bSDavid du Colombier /* wakeup anyone consuming at the other end */
12399ef1f84bSDavid du Colombier if(dowakeup){
12409ef1f84bSDavid du Colombier p = wakeup(&q->rr);
12419ef1f84bSDavid du Colombier
12429ef1f84bSDavid du Colombier /* if we just wokeup a higher priority process, let it run */
12439ef1f84bSDavid du Colombier if(p != nil && p->priority > up->priority)
12449ef1f84bSDavid du Colombier sched();
12459ef1f84bSDavid du Colombier }
12469ef1f84bSDavid du Colombier
12479ef1f84bSDavid du Colombier /*
12489ef1f84bSDavid du Colombier * flow control, wait for queue to get below the limit
12499ef1f84bSDavid du Colombier * before allowing the process to continue and queue
12509ef1f84bSDavid du Colombier * more. We do this here so that postnote can only
12519ef1f84bSDavid du Colombier * interrupt us after the data has been queued. This
12529ef1f84bSDavid du Colombier * means that things like 9p flushes and ssl messages
12539ef1f84bSDavid du Colombier * will not be disrupted by software interrupts.
12549ef1f84bSDavid du Colombier *
12559ef1f84bSDavid du Colombier * Note - this is moderately dangerous since a process
12569ef1f84bSDavid du Colombier * that keeps getting interrupted and rewriting will
12579ef1f84bSDavid du Colombier * queue infinite crud.
12589ef1f84bSDavid du Colombier */
12599ef1f84bSDavid du Colombier for(;;){
12609ef1f84bSDavid du Colombier if(q->noblock || qnotfull(q))
12619ef1f84bSDavid du Colombier break;
12629ef1f84bSDavid du Colombier
12639ef1f84bSDavid du Colombier ilock(q);
12649ef1f84bSDavid du Colombier q->state |= Qflow;
12659ef1f84bSDavid du Colombier iunlock(q);
12669ef1f84bSDavid du Colombier sleep(&q->wr, qnotfull, q);
12679ef1f84bSDavid du Colombier }
12689ef1f84bSDavid du Colombier USED(b);
12699ef1f84bSDavid du Colombier
12709ef1f84bSDavid du Colombier qunlock(&q->wlock);
12719ef1f84bSDavid du Colombier poperror();
12729ef1f84bSDavid du Colombier return n;
12739ef1f84bSDavid du Colombier }
12749ef1f84bSDavid du Colombier
12759ef1f84bSDavid du Colombier /*
12769ef1f84bSDavid du Colombier * write to a queue. only Maxatomic bytes at a time is atomic.
12779ef1f84bSDavid du Colombier */
12789ef1f84bSDavid du Colombier int
qwrite(Queue * q,void * vp,int len)12799ef1f84bSDavid du Colombier qwrite(Queue *q, void *vp, int len)
12809ef1f84bSDavid du Colombier {
12819ef1f84bSDavid du Colombier int n, sofar;
12829ef1f84bSDavid du Colombier Block *b;
12839ef1f84bSDavid du Colombier uchar *p = vp;
12849ef1f84bSDavid du Colombier
12859ef1f84bSDavid du Colombier QDEBUG if(!islo())
12869ef1f84bSDavid du Colombier print("qwrite hi %#p\n", getcallerpc(&q));
12879ef1f84bSDavid du Colombier
12889ef1f84bSDavid du Colombier sofar = 0;
12899ef1f84bSDavid du Colombier do {
12909ef1f84bSDavid du Colombier n = len-sofar;
12919ef1f84bSDavid du Colombier if(n > Maxatomic)
12929ef1f84bSDavid du Colombier n = Maxatomic;
12939ef1f84bSDavid du Colombier
12949ef1f84bSDavid du Colombier b = allocb(n);
12959ef1f84bSDavid du Colombier setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]);
12969ef1f84bSDavid du Colombier if(waserror()){
12979ef1f84bSDavid du Colombier freeb(b);
12989ef1f84bSDavid du Colombier nexterror();
12999ef1f84bSDavid du Colombier }
13009ef1f84bSDavid du Colombier memmove(b->wp, p+sofar, n);
13019ef1f84bSDavid du Colombier poperror();
13029ef1f84bSDavid du Colombier b->wp += n;
13039ef1f84bSDavid du Colombier
13049ef1f84bSDavid du Colombier qbwrite(q, b);
13059ef1f84bSDavid du Colombier
13069ef1f84bSDavid du Colombier sofar += n;
13079ef1f84bSDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0);
13089ef1f84bSDavid du Colombier
13099ef1f84bSDavid du Colombier return len;
13109ef1f84bSDavid du Colombier }
13119ef1f84bSDavid du Colombier
13129ef1f84bSDavid du Colombier /*
13139ef1f84bSDavid du Colombier * used by print() to write to a queue. Since we may be splhi or not in
13149ef1f84bSDavid du Colombier * a process, don't qlock.
13159ef1f84bSDavid du Colombier *
13169ef1f84bSDavid du Colombier * this routine merges adjacent blocks if block n+1 will fit into
13179ef1f84bSDavid du Colombier * the free space of block n.
13189ef1f84bSDavid du Colombier */
13199ef1f84bSDavid du Colombier int
qiwrite(Queue * q,void * vp,int len)13209ef1f84bSDavid du Colombier qiwrite(Queue *q, void *vp, int len)
13219ef1f84bSDavid du Colombier {
13229ef1f84bSDavid du Colombier int n, sofar, dowakeup;
13239ef1f84bSDavid du Colombier Block *b;
13249ef1f84bSDavid du Colombier uchar *p = vp;
13259ef1f84bSDavid du Colombier
13269ef1f84bSDavid du Colombier dowakeup = 0;
13279ef1f84bSDavid du Colombier
13289ef1f84bSDavid du Colombier sofar = 0;
13299ef1f84bSDavid du Colombier do {
13309ef1f84bSDavid du Colombier n = len-sofar;
13319ef1f84bSDavid du Colombier if(n > Maxatomic)
13329ef1f84bSDavid du Colombier n = Maxatomic;
13339ef1f84bSDavid du Colombier
13349ef1f84bSDavid du Colombier b = iallocb(n);
13359ef1f84bSDavid du Colombier if(b == nil)
13369ef1f84bSDavid du Colombier break;
13379ef1f84bSDavid du Colombier memmove(b->wp, p+sofar, n);
13389ef1f84bSDavid du Colombier b->wp += n;
13399ef1f84bSDavid du Colombier
13409ef1f84bSDavid du Colombier ilock(q);
13419ef1f84bSDavid du Colombier
13429ef1f84bSDavid du Colombier /* we use an artificially high limit for kernel prints since anything
13439ef1f84bSDavid du Colombier * over the limit gets dropped
13449ef1f84bSDavid du Colombier */
13459ef1f84bSDavid du Colombier if(q->dlen >= 16*1024){
13469ef1f84bSDavid du Colombier iunlock(q);
13479ef1f84bSDavid du Colombier freeb(b);
13489ef1f84bSDavid du Colombier break;
13499ef1f84bSDavid du Colombier }
13509ef1f84bSDavid du Colombier
13519ef1f84bSDavid du Colombier QDEBUG checkb(b, "qiwrite");
13529ef1f84bSDavid du Colombier if(q->bfirst)
13539ef1f84bSDavid du Colombier q->blast->next = b;
13549ef1f84bSDavid du Colombier else
13559ef1f84bSDavid du Colombier q->bfirst = b;
13569ef1f84bSDavid du Colombier q->blast = b;
13579ef1f84bSDavid du Colombier q->len += BALLOC(b);
13589ef1f84bSDavid du Colombier q->dlen += n;
13599ef1f84bSDavid du Colombier
13609ef1f84bSDavid du Colombier if(q->state & Qstarve){
13619ef1f84bSDavid du Colombier q->state &= ~Qstarve;
13629ef1f84bSDavid du Colombier dowakeup = 1;
13639ef1f84bSDavid du Colombier }
13649ef1f84bSDavid du Colombier
13659ef1f84bSDavid du Colombier iunlock(q);
13669ef1f84bSDavid du Colombier
13679ef1f84bSDavid du Colombier if(dowakeup){
13689ef1f84bSDavid du Colombier if(q->kick)
13699ef1f84bSDavid du Colombier q->kick(q->arg);
13709ef1f84bSDavid du Colombier wakeup(&q->rr);
13719ef1f84bSDavid du Colombier }
13729ef1f84bSDavid du Colombier
13739ef1f84bSDavid du Colombier sofar += n;
13749ef1f84bSDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0);
13759ef1f84bSDavid du Colombier
13769ef1f84bSDavid du Colombier return sofar;
13779ef1f84bSDavid du Colombier }
13789ef1f84bSDavid du Colombier
13799ef1f84bSDavid du Colombier /*
13809ef1f84bSDavid du Colombier * be extremely careful when calling this,
13819ef1f84bSDavid du Colombier * as there is no reference accounting
13829ef1f84bSDavid du Colombier */
13839ef1f84bSDavid du Colombier void
qfree(Queue * q)13849ef1f84bSDavid du Colombier qfree(Queue *q)
13859ef1f84bSDavid du Colombier {
13869ef1f84bSDavid du Colombier qclose(q);
13879ef1f84bSDavid du Colombier free(q);
13889ef1f84bSDavid du Colombier }
13899ef1f84bSDavid du Colombier
13909ef1f84bSDavid du Colombier /*
13919ef1f84bSDavid du Colombier * Mark a queue as closed. No further IO is permitted.
13929ef1f84bSDavid du Colombier * All blocks are released.
13939ef1f84bSDavid du Colombier */
13949ef1f84bSDavid du Colombier void
qclose(Queue * q)13959ef1f84bSDavid du Colombier qclose(Queue *q)
13969ef1f84bSDavid du Colombier {
13979ef1f84bSDavid du Colombier Block *bfirst;
13989ef1f84bSDavid du Colombier
13999ef1f84bSDavid du Colombier if(q == nil)
14009ef1f84bSDavid du Colombier return;
14019ef1f84bSDavid du Colombier
14029ef1f84bSDavid du Colombier /* mark it */
14039ef1f84bSDavid du Colombier ilock(q);
14049ef1f84bSDavid du Colombier q->state |= Qclosed;
14059ef1f84bSDavid du Colombier q->state &= ~(Qflow|Qstarve);
14069ef1f84bSDavid du Colombier strcpy(q->err, Ehungup);
14079ef1f84bSDavid du Colombier bfirst = q->bfirst;
14089ef1f84bSDavid du Colombier q->bfirst = 0;
14099ef1f84bSDavid du Colombier q->len = 0;
14109ef1f84bSDavid du Colombier q->dlen = 0;
14119ef1f84bSDavid du Colombier q->noblock = 0;
14129ef1f84bSDavid du Colombier iunlock(q);
14139ef1f84bSDavid du Colombier
14149ef1f84bSDavid du Colombier /* free queued blocks */
14159ef1f84bSDavid du Colombier freeblist(bfirst);
14169ef1f84bSDavid du Colombier
14179ef1f84bSDavid du Colombier /* wake up readers/writers */
14189ef1f84bSDavid du Colombier wakeup(&q->rr);
14199ef1f84bSDavid du Colombier wakeup(&q->wr);
14209ef1f84bSDavid du Colombier }
14219ef1f84bSDavid du Colombier
14229ef1f84bSDavid du Colombier /*
14239ef1f84bSDavid du Colombier * Mark a queue as closed. Wakeup any readers. Don't remove queued
14249ef1f84bSDavid du Colombier * blocks.
14259ef1f84bSDavid du Colombier */
14269ef1f84bSDavid du Colombier void
qhangup(Queue * q,char * msg)14279ef1f84bSDavid du Colombier qhangup(Queue *q, char *msg)
14289ef1f84bSDavid du Colombier {
14299ef1f84bSDavid du Colombier /* mark it */
14309ef1f84bSDavid du Colombier ilock(q);
14319ef1f84bSDavid du Colombier q->state |= Qclosed;
14329ef1f84bSDavid du Colombier if(msg == 0 || *msg == 0)
14339ef1f84bSDavid du Colombier strcpy(q->err, Ehungup);
14349ef1f84bSDavid du Colombier else
14359ef1f84bSDavid du Colombier strncpy(q->err, msg, ERRMAX-1);
14369ef1f84bSDavid du Colombier iunlock(q);
14379ef1f84bSDavid du Colombier
14389ef1f84bSDavid du Colombier /* wake up readers/writers */
14399ef1f84bSDavid du Colombier wakeup(&q->rr);
14409ef1f84bSDavid du Colombier wakeup(&q->wr);
14419ef1f84bSDavid du Colombier }
14429ef1f84bSDavid du Colombier
14439ef1f84bSDavid du Colombier /*
14449ef1f84bSDavid du Colombier * return non-zero if the q is hungup
14459ef1f84bSDavid du Colombier */
14469ef1f84bSDavid du Colombier int
qisclosed(Queue * q)14479ef1f84bSDavid du Colombier qisclosed(Queue *q)
14489ef1f84bSDavid du Colombier {
14499ef1f84bSDavid du Colombier return q->state & Qclosed;
14509ef1f84bSDavid du Colombier }
14519ef1f84bSDavid du Colombier
14529ef1f84bSDavid du Colombier /*
14539ef1f84bSDavid du Colombier * mark a queue as no longer hung up
14549ef1f84bSDavid du Colombier */
14559ef1f84bSDavid du Colombier void
qreopen(Queue * q)14569ef1f84bSDavid du Colombier qreopen(Queue *q)
14579ef1f84bSDavid du Colombier {
14589ef1f84bSDavid du Colombier ilock(q);
14599ef1f84bSDavid du Colombier q->state &= ~Qclosed;
14609ef1f84bSDavid du Colombier q->state |= Qstarve;
14619ef1f84bSDavid du Colombier q->eof = 0;
14629ef1f84bSDavid du Colombier q->limit = q->inilim;
14639ef1f84bSDavid du Colombier iunlock(q);
14649ef1f84bSDavid du Colombier }
14659ef1f84bSDavid du Colombier
14669ef1f84bSDavid du Colombier /*
14679ef1f84bSDavid du Colombier * return bytes queued
14689ef1f84bSDavid du Colombier */
14699ef1f84bSDavid du Colombier int
qlen(Queue * q)14709ef1f84bSDavid du Colombier qlen(Queue *q)
14719ef1f84bSDavid du Colombier {
14729ef1f84bSDavid du Colombier return q->dlen;
14739ef1f84bSDavid du Colombier }
14749ef1f84bSDavid du Colombier
14759ef1f84bSDavid du Colombier /*
14769ef1f84bSDavid du Colombier * return space remaining before flow control
14779ef1f84bSDavid du Colombier */
14789ef1f84bSDavid du Colombier int
qwindow(Queue * q)14799ef1f84bSDavid du Colombier qwindow(Queue *q)
14809ef1f84bSDavid du Colombier {
14819ef1f84bSDavid du Colombier int l;
14829ef1f84bSDavid du Colombier
14839ef1f84bSDavid du Colombier l = q->limit - q->len;
14849ef1f84bSDavid du Colombier if(l < 0)
14859ef1f84bSDavid du Colombier l = 0;
14869ef1f84bSDavid du Colombier return l;
14879ef1f84bSDavid du Colombier }
14889ef1f84bSDavid du Colombier
14899ef1f84bSDavid du Colombier /*
14909ef1f84bSDavid du Colombier * return true if we can read without blocking
14919ef1f84bSDavid du Colombier */
14929ef1f84bSDavid du Colombier int
qcanread(Queue * q)14939ef1f84bSDavid du Colombier qcanread(Queue *q)
14949ef1f84bSDavid du Colombier {
14959ef1f84bSDavid du Colombier return q->bfirst!=0;
14969ef1f84bSDavid du Colombier }
14979ef1f84bSDavid du Colombier
14989ef1f84bSDavid du Colombier /*
14999ef1f84bSDavid du Colombier * change queue limit
15009ef1f84bSDavid du Colombier */
15019ef1f84bSDavid du Colombier void
qsetlimit(Queue * q,int limit)15029ef1f84bSDavid du Colombier qsetlimit(Queue *q, int limit)
15039ef1f84bSDavid du Colombier {
15049ef1f84bSDavid du Colombier q->limit = limit;
15059ef1f84bSDavid du Colombier }
15069ef1f84bSDavid du Colombier
15079ef1f84bSDavid du Colombier /*
15089ef1f84bSDavid du Colombier * set blocking/nonblocking
15099ef1f84bSDavid du Colombier */
15109ef1f84bSDavid du Colombier void
qnoblock(Queue * q,int onoff)15119ef1f84bSDavid du Colombier qnoblock(Queue *q, int onoff)
15129ef1f84bSDavid du Colombier {
15139ef1f84bSDavid du Colombier q->noblock = onoff;
15149ef1f84bSDavid du Colombier }
15159ef1f84bSDavid du Colombier
15169ef1f84bSDavid du Colombier /*
15179ef1f84bSDavid du Colombier * flush the output queue
15189ef1f84bSDavid du Colombier */
15199ef1f84bSDavid du Colombier void
qflush(Queue * q)15209ef1f84bSDavid du Colombier qflush(Queue *q)
15219ef1f84bSDavid du Colombier {
15229ef1f84bSDavid du Colombier Block *bfirst;
15239ef1f84bSDavid du Colombier
15249ef1f84bSDavid du Colombier /* mark it */
15259ef1f84bSDavid du Colombier ilock(q);
15269ef1f84bSDavid du Colombier bfirst = q->bfirst;
15279ef1f84bSDavid du Colombier q->bfirst = 0;
15289ef1f84bSDavid du Colombier q->len = 0;
15299ef1f84bSDavid du Colombier q->dlen = 0;
15309ef1f84bSDavid du Colombier iunlock(q);
15319ef1f84bSDavid du Colombier
15329ef1f84bSDavid du Colombier /* free queued blocks */
15339ef1f84bSDavid du Colombier freeblist(bfirst);
15349ef1f84bSDavid du Colombier
15359ef1f84bSDavid du Colombier /* wake up readers/writers */
15369ef1f84bSDavid du Colombier wakeup(&q->wr);
15379ef1f84bSDavid du Colombier }
15389ef1f84bSDavid du Colombier
15399ef1f84bSDavid du Colombier int
qfull(Queue * q)15409ef1f84bSDavid du Colombier qfull(Queue *q)
15419ef1f84bSDavid du Colombier {
15429ef1f84bSDavid du Colombier return q->state & Qflow;
15439ef1f84bSDavid du Colombier }
15449ef1f84bSDavid du Colombier
15459ef1f84bSDavid du Colombier int
qstate(Queue * q)15469ef1f84bSDavid du Colombier qstate(Queue *q)
15479ef1f84bSDavid du Colombier {
15489ef1f84bSDavid du Colombier return q->state;
15499ef1f84bSDavid du Colombier }
1550