18ccd4a63SDavid du Colombier #include "u.h"
28ccd4a63SDavid du Colombier #include "lib.h"
38ccd4a63SDavid du Colombier #include "dat.h"
48ccd4a63SDavid du Colombier #include "fns.h"
58ccd4a63SDavid du Colombier #include "error.h"
68ccd4a63SDavid du Colombier
78ccd4a63SDavid du Colombier static ulong padblockcnt;
88ccd4a63SDavid du Colombier static ulong concatblockcnt;
98ccd4a63SDavid du Colombier static ulong pullupblockcnt;
108ccd4a63SDavid du Colombier static ulong copyblockcnt;
118ccd4a63SDavid du Colombier static ulong consumecnt;
128ccd4a63SDavid du Colombier static ulong producecnt;
138ccd4a63SDavid du Colombier static ulong qcopycnt;
148ccd4a63SDavid du Colombier
158ccd4a63SDavid du Colombier static int debugging;
168ccd4a63SDavid du Colombier
178ccd4a63SDavid du Colombier #define QDEBUG if(0)
188ccd4a63SDavid du Colombier
198ccd4a63SDavid du Colombier /*
208ccd4a63SDavid du Colombier * IO queues
218ccd4a63SDavid du Colombier */
228ccd4a63SDavid du Colombier struct Queue
238ccd4a63SDavid du Colombier {
248ccd4a63SDavid du Colombier Lock lk;
258ccd4a63SDavid du Colombier
268ccd4a63SDavid du Colombier Block* bfirst; /* buffer */
278ccd4a63SDavid du Colombier Block* blast;
288ccd4a63SDavid du Colombier
298ccd4a63SDavid du Colombier int len; /* bytes allocated to queue */
308ccd4a63SDavid du Colombier int dlen; /* data bytes in queue */
318ccd4a63SDavid du Colombier int limit; /* max bytes in queue */
328ccd4a63SDavid du Colombier int inilim; /* initial limit */
338ccd4a63SDavid du Colombier int state;
348ccd4a63SDavid du Colombier int noblock; /* true if writes return immediately when q full */
358ccd4a63SDavid du Colombier int eof; /* number of eofs read by user */
368ccd4a63SDavid du Colombier
378ccd4a63SDavid du Colombier void (*kick)(void*); /* restart output */
388ccd4a63SDavid du Colombier void (*bypass)(void*, Block*); /* bypass queue altogether */
398ccd4a63SDavid du Colombier void* arg; /* argument to kick */
408ccd4a63SDavid du Colombier
418ccd4a63SDavid du Colombier QLock rlock; /* mutex for reading processes */
428ccd4a63SDavid du Colombier Rendez rr; /* process waiting to read */
438ccd4a63SDavid du Colombier QLock wlock; /* mutex for writing processes */
448ccd4a63SDavid du Colombier Rendez wr; /* process waiting to write */
458ccd4a63SDavid du Colombier
468ccd4a63SDavid du Colombier char err[ERRMAX];
478ccd4a63SDavid du Colombier };
488ccd4a63SDavid du Colombier
498ccd4a63SDavid du Colombier enum
508ccd4a63SDavid du Colombier {
518ccd4a63SDavid du Colombier Maxatomic = 64*1024,
528ccd4a63SDavid du Colombier };
538ccd4a63SDavid du Colombier
548ccd4a63SDavid du Colombier uint qiomaxatomic = Maxatomic;
558ccd4a63SDavid du Colombier
568ccd4a63SDavid du Colombier void
ixsummary(void)578ccd4a63SDavid du Colombier ixsummary(void)
588ccd4a63SDavid du Colombier {
598ccd4a63SDavid du Colombier debugging ^= 1;
608ccd4a63SDavid du Colombier iallocsummary();
618ccd4a63SDavid du Colombier print("pad %lud, concat %lud, pullup %lud, copy %lud\n",
628ccd4a63SDavid du Colombier padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
638ccd4a63SDavid du Colombier print("consume %lud, produce %lud, qcopy %lud\n",
648ccd4a63SDavid du Colombier consumecnt, producecnt, qcopycnt);
658ccd4a63SDavid du Colombier }
668ccd4a63SDavid du Colombier
678ccd4a63SDavid du Colombier /*
688ccd4a63SDavid du Colombier * free a list of blocks
698ccd4a63SDavid du Colombier */
708ccd4a63SDavid du Colombier void
freeblist(Block * b)718ccd4a63SDavid du Colombier freeblist(Block *b)
728ccd4a63SDavid du Colombier {
738ccd4a63SDavid du Colombier Block *next;
748ccd4a63SDavid du Colombier
758ccd4a63SDavid du Colombier for(; b != 0; b = next){
768ccd4a63SDavid du Colombier next = b->next;
778ccd4a63SDavid du Colombier b->next = 0;
788ccd4a63SDavid du Colombier freeb(b);
798ccd4a63SDavid du Colombier }
808ccd4a63SDavid du Colombier }
818ccd4a63SDavid du Colombier
828ccd4a63SDavid du Colombier /*
838ccd4a63SDavid du Colombier * pad a block to the front (or the back if size is negative)
848ccd4a63SDavid du Colombier */
858ccd4a63SDavid du Colombier Block*
padblock(Block * bp,int size)868ccd4a63SDavid du Colombier padblock(Block *bp, int size)
878ccd4a63SDavid du Colombier {
888ccd4a63SDavid du Colombier int n;
898ccd4a63SDavid du Colombier Block *nbp;
908ccd4a63SDavid du Colombier
918ccd4a63SDavid du Colombier QDEBUG checkb(bp, "padblock 1");
928ccd4a63SDavid du Colombier if(size >= 0){
938ccd4a63SDavid du Colombier if(bp->rp - bp->base >= size){
948ccd4a63SDavid du Colombier bp->rp -= size;
958ccd4a63SDavid du Colombier return bp;
968ccd4a63SDavid du Colombier }
978ccd4a63SDavid du Colombier
988ccd4a63SDavid du Colombier if(bp->next)
99*ec59a3ddSDavid du Colombier panic("padblock 0x%p", getcallerpc(&bp));
1008ccd4a63SDavid du Colombier n = BLEN(bp);
1018ccd4a63SDavid du Colombier padblockcnt++;
1028ccd4a63SDavid du Colombier nbp = allocb(size+n);
1038ccd4a63SDavid du Colombier nbp->rp += size;
1048ccd4a63SDavid du Colombier nbp->wp = nbp->rp;
1058ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, n);
1068ccd4a63SDavid du Colombier nbp->wp += n;
1078ccd4a63SDavid du Colombier freeb(bp);
1088ccd4a63SDavid du Colombier nbp->rp -= size;
1098ccd4a63SDavid du Colombier } else {
1108ccd4a63SDavid du Colombier size = -size;
1118ccd4a63SDavid du Colombier
1128ccd4a63SDavid du Colombier if(bp->next)
113*ec59a3ddSDavid du Colombier panic("padblock 0x%p", getcallerpc(&bp));
1148ccd4a63SDavid du Colombier
1158ccd4a63SDavid du Colombier if(bp->lim - bp->wp >= size)
1168ccd4a63SDavid du Colombier return bp;
1178ccd4a63SDavid du Colombier
1188ccd4a63SDavid du Colombier n = BLEN(bp);
1198ccd4a63SDavid du Colombier padblockcnt++;
1208ccd4a63SDavid du Colombier nbp = allocb(size+n);
1218ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, n);
1228ccd4a63SDavid du Colombier nbp->wp += n;
1238ccd4a63SDavid du Colombier freeb(bp);
1248ccd4a63SDavid du Colombier }
1258ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "padblock 1");
1268ccd4a63SDavid du Colombier return nbp;
1278ccd4a63SDavid du Colombier }
1288ccd4a63SDavid du Colombier
1298ccd4a63SDavid du Colombier /*
1308ccd4a63SDavid du Colombier * return count of bytes in a string of blocks
1318ccd4a63SDavid du Colombier */
1328ccd4a63SDavid du Colombier int
blocklen(Block * bp)1338ccd4a63SDavid du Colombier blocklen(Block *bp)
1348ccd4a63SDavid du Colombier {
1358ccd4a63SDavid du Colombier int len;
1368ccd4a63SDavid du Colombier
1378ccd4a63SDavid du Colombier len = 0;
1388ccd4a63SDavid du Colombier while(bp) {
1398ccd4a63SDavid du Colombier len += BLEN(bp);
1408ccd4a63SDavid du Colombier bp = bp->next;
1418ccd4a63SDavid du Colombier }
1428ccd4a63SDavid du Colombier return len;
1438ccd4a63SDavid du Colombier }
1448ccd4a63SDavid du Colombier
1458ccd4a63SDavid du Colombier /*
1468ccd4a63SDavid du Colombier * return count of space in blocks
1478ccd4a63SDavid du Colombier */
1488ccd4a63SDavid du Colombier int
blockalloclen(Block * bp)1498ccd4a63SDavid du Colombier blockalloclen(Block *bp)
1508ccd4a63SDavid du Colombier {
1518ccd4a63SDavid du Colombier int len;
1528ccd4a63SDavid du Colombier
1538ccd4a63SDavid du Colombier len = 0;
1548ccd4a63SDavid du Colombier while(bp) {
1558ccd4a63SDavid du Colombier len += BALLOC(bp);
1568ccd4a63SDavid du Colombier bp = bp->next;
1578ccd4a63SDavid du Colombier }
1588ccd4a63SDavid du Colombier return len;
1598ccd4a63SDavid du Colombier }
1608ccd4a63SDavid du Colombier
1618ccd4a63SDavid du Colombier /*
1628ccd4a63SDavid du Colombier * copy the string of blocks into
1638ccd4a63SDavid du Colombier * a single block and free the string
1648ccd4a63SDavid du Colombier */
1658ccd4a63SDavid du Colombier Block*
concatblock(Block * bp)1668ccd4a63SDavid du Colombier concatblock(Block *bp)
1678ccd4a63SDavid du Colombier {
1688ccd4a63SDavid du Colombier int len;
1698ccd4a63SDavid du Colombier Block *nb, *f;
1708ccd4a63SDavid du Colombier
1718ccd4a63SDavid du Colombier if(bp->next == 0)
1728ccd4a63SDavid du Colombier return bp;
1738ccd4a63SDavid du Colombier
1748ccd4a63SDavid du Colombier nb = allocb(blocklen(bp));
1758ccd4a63SDavid du Colombier for(f = bp; f; f = f->next) {
1768ccd4a63SDavid du Colombier len = BLEN(f);
1778ccd4a63SDavid du Colombier memmove(nb->wp, f->rp, len);
1788ccd4a63SDavid du Colombier nb->wp += len;
1798ccd4a63SDavid du Colombier }
1808ccd4a63SDavid du Colombier concatblockcnt += BLEN(nb);
1818ccd4a63SDavid du Colombier freeblist(bp);
1828ccd4a63SDavid du Colombier QDEBUG checkb(nb, "concatblock 1");
1838ccd4a63SDavid du Colombier return nb;
1848ccd4a63SDavid du Colombier }
1858ccd4a63SDavid du Colombier
1868ccd4a63SDavid du Colombier /*
1878ccd4a63SDavid du Colombier * make sure the first block has at least n bytes
1888ccd4a63SDavid du Colombier */
1898ccd4a63SDavid du Colombier Block*
pullupblock(Block * bp,int n)1908ccd4a63SDavid du Colombier pullupblock(Block *bp, int n)
1918ccd4a63SDavid du Colombier {
1928ccd4a63SDavid du Colombier int i;
1938ccd4a63SDavid du Colombier Block *nbp;
1948ccd4a63SDavid du Colombier
1958ccd4a63SDavid du Colombier /*
1968ccd4a63SDavid du Colombier * this should almost always be true, it's
1978ccd4a63SDavid du Colombier * just to avoid every caller checking.
1988ccd4a63SDavid du Colombier */
1998ccd4a63SDavid du Colombier if(BLEN(bp) >= n)
2008ccd4a63SDavid du Colombier return bp;
2018ccd4a63SDavid du Colombier
2028ccd4a63SDavid du Colombier /*
2038ccd4a63SDavid du Colombier * if not enough room in the first block,
2048ccd4a63SDavid du Colombier * add another to the front of the list.
2058ccd4a63SDavid du Colombier */
2068ccd4a63SDavid du Colombier if(bp->lim - bp->rp < n){
2078ccd4a63SDavid du Colombier nbp = allocb(n);
2088ccd4a63SDavid du Colombier nbp->next = bp;
2098ccd4a63SDavid du Colombier bp = nbp;
2108ccd4a63SDavid du Colombier }
2118ccd4a63SDavid du Colombier
2128ccd4a63SDavid du Colombier /*
2138ccd4a63SDavid du Colombier * copy bytes from the trailing blocks into the first
2148ccd4a63SDavid du Colombier */
2158ccd4a63SDavid du Colombier n -= BLEN(bp);
2168ccd4a63SDavid du Colombier while((nbp = bp->next)){
2178ccd4a63SDavid du Colombier i = BLEN(nbp);
2188ccd4a63SDavid du Colombier if(i > n) {
2198ccd4a63SDavid du Colombier memmove(bp->wp, nbp->rp, n);
2208ccd4a63SDavid du Colombier pullupblockcnt++;
2218ccd4a63SDavid du Colombier bp->wp += n;
2228ccd4a63SDavid du Colombier nbp->rp += n;
2238ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullupblock 1");
2248ccd4a63SDavid du Colombier return bp;
2258ccd4a63SDavid du Colombier } else {
2268ccd4a63SDavid du Colombier /* shouldn't happen but why crash if it does */
2278ccd4a63SDavid du Colombier if(i < 0){
2288ccd4a63SDavid du Colombier print("pullup negative length packet\n");
2298ccd4a63SDavid du Colombier i = 0;
2308ccd4a63SDavid du Colombier }
2318ccd4a63SDavid du Colombier memmove(bp->wp, nbp->rp, i);
2328ccd4a63SDavid du Colombier pullupblockcnt++;
2338ccd4a63SDavid du Colombier bp->wp += i;
2348ccd4a63SDavid du Colombier bp->next = nbp->next;
2358ccd4a63SDavid du Colombier nbp->next = 0;
2368ccd4a63SDavid du Colombier freeb(nbp);
2378ccd4a63SDavid du Colombier n -= i;
2388ccd4a63SDavid du Colombier if(n == 0){
2398ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullupblock 2");
2408ccd4a63SDavid du Colombier return bp;
2418ccd4a63SDavid du Colombier }
2428ccd4a63SDavid du Colombier }
2438ccd4a63SDavid du Colombier }
2448ccd4a63SDavid du Colombier freeb(bp);
2458ccd4a63SDavid du Colombier return 0;
2468ccd4a63SDavid du Colombier }
2478ccd4a63SDavid du Colombier
2488ccd4a63SDavid du Colombier /*
2498ccd4a63SDavid du Colombier * make sure the first block has at least n bytes
2508ccd4a63SDavid du Colombier */
2518ccd4a63SDavid du Colombier Block*
pullupqueue(Queue * q,int n)2528ccd4a63SDavid du Colombier pullupqueue(Queue *q, int n)
2538ccd4a63SDavid du Colombier {
2548ccd4a63SDavid du Colombier Block *b;
2558ccd4a63SDavid du Colombier
2568ccd4a63SDavid du Colombier if(BLEN(q->bfirst) >= n)
2578ccd4a63SDavid du Colombier return q->bfirst;
2588ccd4a63SDavid du Colombier q->bfirst = pullupblock(q->bfirst, n);
2598ccd4a63SDavid du Colombier for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
2608ccd4a63SDavid du Colombier ;
2618ccd4a63SDavid du Colombier q->blast = b;
2628ccd4a63SDavid du Colombier return q->bfirst;
2638ccd4a63SDavid du Colombier }
2648ccd4a63SDavid du Colombier
2658ccd4a63SDavid du Colombier /*
2668ccd4a63SDavid du Colombier * trim to len bytes starting at offset
2678ccd4a63SDavid du Colombier */
2688ccd4a63SDavid du Colombier Block *
trimblock(Block * bp,int offset,int len)2698ccd4a63SDavid du Colombier trimblock(Block *bp, int offset, int len)
2708ccd4a63SDavid du Colombier {
2718ccd4a63SDavid du Colombier ulong l;
2728ccd4a63SDavid du Colombier Block *nb, *startb;
2738ccd4a63SDavid du Colombier
2748ccd4a63SDavid du Colombier QDEBUG checkb(bp, "trimblock 1");
2758ccd4a63SDavid du Colombier if(blocklen(bp) < offset+len) {
2768ccd4a63SDavid du Colombier freeblist(bp);
2778ccd4a63SDavid du Colombier return nil;
2788ccd4a63SDavid du Colombier }
2798ccd4a63SDavid du Colombier
2808ccd4a63SDavid du Colombier while((l = BLEN(bp)) < offset) {
2818ccd4a63SDavid du Colombier offset -= l;
2828ccd4a63SDavid du Colombier nb = bp->next;
2838ccd4a63SDavid du Colombier bp->next = nil;
2848ccd4a63SDavid du Colombier freeb(bp);
2858ccd4a63SDavid du Colombier bp = nb;
2868ccd4a63SDavid du Colombier }
2878ccd4a63SDavid du Colombier
2888ccd4a63SDavid du Colombier startb = bp;
2898ccd4a63SDavid du Colombier bp->rp += offset;
2908ccd4a63SDavid du Colombier
2918ccd4a63SDavid du Colombier while((l = BLEN(bp)) < len) {
2928ccd4a63SDavid du Colombier len -= l;
2938ccd4a63SDavid du Colombier bp = bp->next;
2948ccd4a63SDavid du Colombier }
2958ccd4a63SDavid du Colombier
2968ccd4a63SDavid du Colombier bp->wp -= (BLEN(bp) - len);
2978ccd4a63SDavid du Colombier
2988ccd4a63SDavid du Colombier if(bp->next) {
2998ccd4a63SDavid du Colombier freeblist(bp->next);
3008ccd4a63SDavid du Colombier bp->next = nil;
3018ccd4a63SDavid du Colombier }
3028ccd4a63SDavid du Colombier
3038ccd4a63SDavid du Colombier return startb;
3048ccd4a63SDavid du Colombier }
3058ccd4a63SDavid du Colombier
3068ccd4a63SDavid du Colombier /*
3078ccd4a63SDavid du Colombier * copy 'count' bytes into a new block
3088ccd4a63SDavid du Colombier */
3098ccd4a63SDavid du Colombier Block*
copyblock(Block * bp,int count)3108ccd4a63SDavid du Colombier copyblock(Block *bp, int count)
3118ccd4a63SDavid du Colombier {
3128ccd4a63SDavid du Colombier int l;
3138ccd4a63SDavid du Colombier Block *nbp;
3148ccd4a63SDavid du Colombier
3158ccd4a63SDavid du Colombier QDEBUG checkb(bp, "copyblock 0");
3168ccd4a63SDavid du Colombier nbp = allocb(count);
3178ccd4a63SDavid du Colombier for(; count > 0 && bp != 0; bp = bp->next){
3188ccd4a63SDavid du Colombier l = BLEN(bp);
3198ccd4a63SDavid du Colombier if(l > count)
3208ccd4a63SDavid du Colombier l = count;
3218ccd4a63SDavid du Colombier memmove(nbp->wp, bp->rp, l);
3228ccd4a63SDavid du Colombier nbp->wp += l;
3238ccd4a63SDavid du Colombier count -= l;
3248ccd4a63SDavid du Colombier }
3258ccd4a63SDavid du Colombier if(count > 0){
3268ccd4a63SDavid du Colombier memset(nbp->wp, 0, count);
3278ccd4a63SDavid du Colombier nbp->wp += count;
3288ccd4a63SDavid du Colombier }
3298ccd4a63SDavid du Colombier copyblockcnt++;
3308ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "copyblock 1");
3318ccd4a63SDavid du Colombier
3328ccd4a63SDavid du Colombier return nbp;
3338ccd4a63SDavid du Colombier }
3348ccd4a63SDavid du Colombier
3358ccd4a63SDavid du Colombier Block*
adjustblock(Block * bp,int len)3368ccd4a63SDavid du Colombier adjustblock(Block* bp, int len)
3378ccd4a63SDavid du Colombier {
3388ccd4a63SDavid du Colombier int n;
3398ccd4a63SDavid du Colombier Block *nbp;
3408ccd4a63SDavid du Colombier
3418ccd4a63SDavid du Colombier if(len < 0){
3428ccd4a63SDavid du Colombier freeb(bp);
3438ccd4a63SDavid du Colombier return nil;
3448ccd4a63SDavid du Colombier }
3458ccd4a63SDavid du Colombier
3468ccd4a63SDavid du Colombier if(bp->rp+len > bp->lim){
3478ccd4a63SDavid du Colombier nbp = copyblock(bp, len);
3488ccd4a63SDavid du Colombier freeblist(bp);
3498ccd4a63SDavid du Colombier QDEBUG checkb(nbp, "adjustblock 1");
3508ccd4a63SDavid du Colombier
3518ccd4a63SDavid du Colombier return nbp;
3528ccd4a63SDavid du Colombier }
3538ccd4a63SDavid du Colombier
3548ccd4a63SDavid du Colombier n = BLEN(bp);
3558ccd4a63SDavid du Colombier if(len > n)
3568ccd4a63SDavid du Colombier memset(bp->wp, 0, len-n);
3578ccd4a63SDavid du Colombier bp->wp = bp->rp+len;
3588ccd4a63SDavid du Colombier QDEBUG checkb(bp, "adjustblock 2");
3598ccd4a63SDavid du Colombier
3608ccd4a63SDavid du Colombier return bp;
3618ccd4a63SDavid du Colombier }
3628ccd4a63SDavid du Colombier
3638ccd4a63SDavid du Colombier
3648ccd4a63SDavid du Colombier /*
3658ccd4a63SDavid du Colombier * throw away up to count bytes from a
3668ccd4a63SDavid du Colombier * list of blocks. Return count of bytes
3678ccd4a63SDavid du Colombier * thrown away.
3688ccd4a63SDavid du Colombier */
3698ccd4a63SDavid du Colombier int
pullblock(Block ** bph,int count)3708ccd4a63SDavid du Colombier pullblock(Block **bph, int count)
3718ccd4a63SDavid du Colombier {
3728ccd4a63SDavid du Colombier Block *bp;
3738ccd4a63SDavid du Colombier int n, bytes;
3748ccd4a63SDavid du Colombier
3758ccd4a63SDavid du Colombier bytes = 0;
3768ccd4a63SDavid du Colombier if(bph == nil)
3778ccd4a63SDavid du Colombier return 0;
3788ccd4a63SDavid du Colombier
3798ccd4a63SDavid du Colombier while(*bph != nil && count != 0) {
3808ccd4a63SDavid du Colombier bp = *bph;
3818ccd4a63SDavid du Colombier n = BLEN(bp);
3828ccd4a63SDavid du Colombier if(count < n)
3838ccd4a63SDavid du Colombier n = count;
3848ccd4a63SDavid du Colombier bytes += n;
3858ccd4a63SDavid du Colombier count -= n;
3868ccd4a63SDavid du Colombier bp->rp += n;
3878ccd4a63SDavid du Colombier QDEBUG checkb(bp, "pullblock ");
3888ccd4a63SDavid du Colombier if(BLEN(bp) == 0) {
3898ccd4a63SDavid du Colombier *bph = bp->next;
3908ccd4a63SDavid du Colombier bp->next = nil;
3918ccd4a63SDavid du Colombier freeb(bp);
3928ccd4a63SDavid du Colombier }
3938ccd4a63SDavid du Colombier }
3948ccd4a63SDavid du Colombier return bytes;
3958ccd4a63SDavid du Colombier }
3968ccd4a63SDavid du Colombier
3978ccd4a63SDavid du Colombier /*
3988ccd4a63SDavid du Colombier * get next block from a queue, return null if nothing there
3998ccd4a63SDavid du Colombier */
4008ccd4a63SDavid du Colombier Block*
qget(Queue * q)4018ccd4a63SDavid du Colombier qget(Queue *q)
4028ccd4a63SDavid du Colombier {
4038ccd4a63SDavid du Colombier int dowakeup;
4048ccd4a63SDavid du Colombier Block *b;
4058ccd4a63SDavid du Colombier
4068ccd4a63SDavid du Colombier /* sync with qwrite */
4078ccd4a63SDavid du Colombier ilock(&q->lk);
4088ccd4a63SDavid du Colombier
4098ccd4a63SDavid du Colombier b = q->bfirst;
4108ccd4a63SDavid du Colombier if(b == nil){
4118ccd4a63SDavid du Colombier q->state |= Qstarve;
4128ccd4a63SDavid du Colombier iunlock(&q->lk);
4138ccd4a63SDavid du Colombier return nil;
4148ccd4a63SDavid du Colombier }
4158ccd4a63SDavid du Colombier q->bfirst = b->next;
4168ccd4a63SDavid du Colombier b->next = 0;
4178ccd4a63SDavid du Colombier q->len -= BALLOC(b);
4188ccd4a63SDavid du Colombier q->dlen -= BLEN(b);
4198ccd4a63SDavid du Colombier QDEBUG checkb(b, "qget");
4208ccd4a63SDavid du Colombier
4218ccd4a63SDavid du Colombier /* if writer flow controlled, restart */
4228ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
4238ccd4a63SDavid du Colombier q->state &= ~Qflow;
4248ccd4a63SDavid du Colombier dowakeup = 1;
4258ccd4a63SDavid du Colombier } else
4268ccd4a63SDavid du Colombier dowakeup = 0;
4278ccd4a63SDavid du Colombier
4288ccd4a63SDavid du Colombier iunlock(&q->lk);
4298ccd4a63SDavid du Colombier
4308ccd4a63SDavid du Colombier if(dowakeup)
4318ccd4a63SDavid du Colombier wakeup(&q->wr);
4328ccd4a63SDavid du Colombier
4338ccd4a63SDavid du Colombier return b;
4348ccd4a63SDavid du Colombier }
4358ccd4a63SDavid du Colombier
4368ccd4a63SDavid du Colombier /*
4378ccd4a63SDavid du Colombier * throw away the next 'len' bytes in the queue
4388ccd4a63SDavid du Colombier */
4398ccd4a63SDavid du Colombier int
qdiscard(Queue * q,int len)4408ccd4a63SDavid du Colombier qdiscard(Queue *q, int len)
4418ccd4a63SDavid du Colombier {
4428ccd4a63SDavid du Colombier Block *b;
4438ccd4a63SDavid du Colombier int dowakeup, n, sofar;
4448ccd4a63SDavid du Colombier
4458ccd4a63SDavid du Colombier ilock(&q->lk);
4468ccd4a63SDavid du Colombier for(sofar = 0; sofar < len; sofar += n){
4478ccd4a63SDavid du Colombier b = q->bfirst;
4488ccd4a63SDavid du Colombier if(b == nil)
4498ccd4a63SDavid du Colombier break;
4508ccd4a63SDavid du Colombier QDEBUG checkb(b, "qdiscard");
4518ccd4a63SDavid du Colombier n = BLEN(b);
4528ccd4a63SDavid du Colombier if(n <= len - sofar){
4538ccd4a63SDavid du Colombier q->bfirst = b->next;
4548ccd4a63SDavid du Colombier b->next = 0;
4558ccd4a63SDavid du Colombier q->len -= BALLOC(b);
4568ccd4a63SDavid du Colombier q->dlen -= BLEN(b);
4578ccd4a63SDavid du Colombier freeb(b);
4588ccd4a63SDavid du Colombier } else {
4598ccd4a63SDavid du Colombier n = len - sofar;
4608ccd4a63SDavid du Colombier b->rp += n;
4618ccd4a63SDavid du Colombier q->dlen -= n;
4628ccd4a63SDavid du Colombier }
4638ccd4a63SDavid du Colombier }
4648ccd4a63SDavid du Colombier
4658ccd4a63SDavid du Colombier /*
4668ccd4a63SDavid du Colombier * if writer flow controlled, restart
4678ccd4a63SDavid du Colombier *
4688ccd4a63SDavid du Colombier * This used to be
4698ccd4a63SDavid du Colombier * q->len < q->limit/2
4708ccd4a63SDavid du Colombier * but it slows down tcp too much for certain write sizes.
4718ccd4a63SDavid du Colombier * I really don't understand it completely. It may be
4728ccd4a63SDavid du Colombier * due to the queue draining so fast that the transmission
4738ccd4a63SDavid du Colombier * stalls waiting for the app to produce more data. - presotto
4748ccd4a63SDavid du Colombier */
4758ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit){
4768ccd4a63SDavid du Colombier q->state &= ~Qflow;
4778ccd4a63SDavid du Colombier dowakeup = 1;
4788ccd4a63SDavid du Colombier } else
4798ccd4a63SDavid du Colombier dowakeup = 0;
4808ccd4a63SDavid du Colombier
4818ccd4a63SDavid du Colombier iunlock(&q->lk);
4828ccd4a63SDavid du Colombier
4838ccd4a63SDavid du Colombier if(dowakeup)
4848ccd4a63SDavid du Colombier wakeup(&q->wr);
4858ccd4a63SDavid du Colombier
4868ccd4a63SDavid du Colombier return sofar;
4878ccd4a63SDavid du Colombier }
4888ccd4a63SDavid du Colombier
4898ccd4a63SDavid du Colombier /*
4908ccd4a63SDavid du Colombier * Interrupt level copy out of a queue, return # bytes copied.
4918ccd4a63SDavid du Colombier */
4928ccd4a63SDavid du Colombier int
qconsume(Queue * q,void * vp,int len)4938ccd4a63SDavid du Colombier qconsume(Queue *q, void *vp, int len)
4948ccd4a63SDavid du Colombier {
4958ccd4a63SDavid du Colombier Block *b;
4968ccd4a63SDavid du Colombier int n, dowakeup;
4978ccd4a63SDavid du Colombier uchar *p = vp;
4988ccd4a63SDavid du Colombier Block *tofree = nil;
4998ccd4a63SDavid du Colombier
5008ccd4a63SDavid du Colombier /* sync with qwrite */
5018ccd4a63SDavid du Colombier ilock(&q->lk);
5028ccd4a63SDavid du Colombier
5038ccd4a63SDavid du Colombier for(;;) {
5048ccd4a63SDavid du Colombier b = q->bfirst;
5058ccd4a63SDavid du Colombier if(b == 0){
5068ccd4a63SDavid du Colombier q->state |= Qstarve;
5078ccd4a63SDavid du Colombier iunlock(&q->lk);
5088ccd4a63SDavid du Colombier return -1;
5098ccd4a63SDavid du Colombier }
5108ccd4a63SDavid du Colombier QDEBUG checkb(b, "qconsume 1");
5118ccd4a63SDavid du Colombier
5128ccd4a63SDavid du Colombier n = BLEN(b);
5138ccd4a63SDavid du Colombier if(n > 0)
5148ccd4a63SDavid du Colombier break;
5158ccd4a63SDavid du Colombier q->bfirst = b->next;
5168ccd4a63SDavid du Colombier q->len -= BALLOC(b);
5178ccd4a63SDavid du Colombier
5188ccd4a63SDavid du Colombier /* remember to free this */
5198ccd4a63SDavid du Colombier b->next = tofree;
5208ccd4a63SDavid du Colombier tofree = b;
5218ccd4a63SDavid du Colombier };
5228ccd4a63SDavid du Colombier
5238ccd4a63SDavid du Colombier if(n < len)
5248ccd4a63SDavid du Colombier len = n;
5258ccd4a63SDavid du Colombier memmove(p, b->rp, len);
5268ccd4a63SDavid du Colombier consumecnt += n;
5278ccd4a63SDavid du Colombier b->rp += len;
5288ccd4a63SDavid du Colombier q->dlen -= len;
5298ccd4a63SDavid du Colombier
5308ccd4a63SDavid du Colombier /* discard the block if we're done with it */
5318ccd4a63SDavid du Colombier if((q->state & Qmsg) || len == n){
5328ccd4a63SDavid du Colombier q->bfirst = b->next;
5338ccd4a63SDavid du Colombier b->next = 0;
5348ccd4a63SDavid du Colombier q->len -= BALLOC(b);
5358ccd4a63SDavid du Colombier q->dlen -= BLEN(b);
5368ccd4a63SDavid du Colombier
5378ccd4a63SDavid du Colombier /* remember to free this */
5388ccd4a63SDavid du Colombier b->next = tofree;
5398ccd4a63SDavid du Colombier tofree = b;
5408ccd4a63SDavid du Colombier }
5418ccd4a63SDavid du Colombier
5428ccd4a63SDavid du Colombier /* if writer flow controlled, restart */
5438ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
5448ccd4a63SDavid du Colombier q->state &= ~Qflow;
5458ccd4a63SDavid du Colombier dowakeup = 1;
5468ccd4a63SDavid du Colombier } else
5478ccd4a63SDavid du Colombier dowakeup = 0;
5488ccd4a63SDavid du Colombier
5498ccd4a63SDavid du Colombier iunlock(&q->lk);
5508ccd4a63SDavid du Colombier
5518ccd4a63SDavid du Colombier if(dowakeup)
5528ccd4a63SDavid du Colombier wakeup(&q->wr);
5538ccd4a63SDavid du Colombier
5548ccd4a63SDavid du Colombier if(tofree != nil)
5558ccd4a63SDavid du Colombier freeblist(tofree);
5568ccd4a63SDavid du Colombier
5578ccd4a63SDavid du Colombier return len;
5588ccd4a63SDavid du Colombier }
5598ccd4a63SDavid du Colombier
5608ccd4a63SDavid du Colombier int
qpass(Queue * q,Block * b)5618ccd4a63SDavid du Colombier qpass(Queue *q, Block *b)
5628ccd4a63SDavid du Colombier {
5638ccd4a63SDavid du Colombier int dlen, len, dowakeup;
5648ccd4a63SDavid du Colombier
5658ccd4a63SDavid du Colombier /* sync with qread */
5668ccd4a63SDavid du Colombier dowakeup = 0;
5678ccd4a63SDavid du Colombier ilock(&q->lk);
5688ccd4a63SDavid du Colombier if(q->len >= q->limit){
5698ccd4a63SDavid du Colombier freeblist(b);
5708ccd4a63SDavid du Colombier iunlock(&q->lk);
5718ccd4a63SDavid du Colombier return -1;
5728ccd4a63SDavid du Colombier }
5738ccd4a63SDavid du Colombier if(q->state & Qclosed){
5748ccd4a63SDavid du Colombier freeblist(b);
5758ccd4a63SDavid du Colombier iunlock(&q->lk);
5768ccd4a63SDavid du Colombier return BALLOC(b);
5778ccd4a63SDavid du Colombier }
5788ccd4a63SDavid du Colombier
5798ccd4a63SDavid du Colombier /* add buffer to queue */
5808ccd4a63SDavid du Colombier if(q->bfirst)
5818ccd4a63SDavid du Colombier q->blast->next = b;
5828ccd4a63SDavid du Colombier else
5838ccd4a63SDavid du Colombier q->bfirst = b;
5848ccd4a63SDavid du Colombier len = BALLOC(b);
5858ccd4a63SDavid du Colombier dlen = BLEN(b);
5868ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass");
5878ccd4a63SDavid du Colombier while(b->next){
5888ccd4a63SDavid du Colombier b = b->next;
5898ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass");
5908ccd4a63SDavid du Colombier len += BALLOC(b);
5918ccd4a63SDavid du Colombier dlen += BLEN(b);
5928ccd4a63SDavid du Colombier }
5938ccd4a63SDavid du Colombier q->blast = b;
5948ccd4a63SDavid du Colombier q->len += len;
5958ccd4a63SDavid du Colombier q->dlen += dlen;
5968ccd4a63SDavid du Colombier
5978ccd4a63SDavid du Colombier if(q->len >= q->limit/2)
5988ccd4a63SDavid du Colombier q->state |= Qflow;
5998ccd4a63SDavid du Colombier
6008ccd4a63SDavid du Colombier if(q->state & Qstarve){
6018ccd4a63SDavid du Colombier q->state &= ~Qstarve;
6028ccd4a63SDavid du Colombier dowakeup = 1;
6038ccd4a63SDavid du Colombier }
6048ccd4a63SDavid du Colombier iunlock(&q->lk);
6058ccd4a63SDavid du Colombier
6068ccd4a63SDavid du Colombier if(dowakeup)
6078ccd4a63SDavid du Colombier wakeup(&q->rr);
6088ccd4a63SDavid du Colombier
6098ccd4a63SDavid du Colombier return len;
6108ccd4a63SDavid du Colombier }
6118ccd4a63SDavid du Colombier
6128ccd4a63SDavid du Colombier int
qpassnolim(Queue * q,Block * b)6138ccd4a63SDavid du Colombier qpassnolim(Queue *q, Block *b)
6148ccd4a63SDavid du Colombier {
6158ccd4a63SDavid du Colombier int dlen, len, dowakeup;
6168ccd4a63SDavid du Colombier
6178ccd4a63SDavid du Colombier /* sync with qread */
6188ccd4a63SDavid du Colombier dowakeup = 0;
6198ccd4a63SDavid du Colombier ilock(&q->lk);
6208ccd4a63SDavid du Colombier
6218ccd4a63SDavid du Colombier if(q->state & Qclosed){
6228ccd4a63SDavid du Colombier freeblist(b);
6238ccd4a63SDavid du Colombier iunlock(&q->lk);
6248ccd4a63SDavid du Colombier return BALLOC(b);
6258ccd4a63SDavid du Colombier }
6268ccd4a63SDavid du Colombier
6278ccd4a63SDavid du Colombier /* add buffer to queue */
6288ccd4a63SDavid du Colombier if(q->bfirst)
6298ccd4a63SDavid du Colombier q->blast->next = b;
6308ccd4a63SDavid du Colombier else
6318ccd4a63SDavid du Colombier q->bfirst = b;
6328ccd4a63SDavid du Colombier len = BALLOC(b);
6338ccd4a63SDavid du Colombier dlen = BLEN(b);
6348ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass");
6358ccd4a63SDavid du Colombier while(b->next){
6368ccd4a63SDavid du Colombier b = b->next;
6378ccd4a63SDavid du Colombier QDEBUG checkb(b, "qpass");
6388ccd4a63SDavid du Colombier len += BALLOC(b);
6398ccd4a63SDavid du Colombier dlen += BLEN(b);
6408ccd4a63SDavid du Colombier }
6418ccd4a63SDavid du Colombier q->blast = b;
6428ccd4a63SDavid du Colombier q->len += len;
6438ccd4a63SDavid du Colombier q->dlen += dlen;
6448ccd4a63SDavid du Colombier
6458ccd4a63SDavid du Colombier if(q->len >= q->limit/2)
6468ccd4a63SDavid du Colombier q->state |= Qflow;
6478ccd4a63SDavid du Colombier
6488ccd4a63SDavid du Colombier if(q->state & Qstarve){
6498ccd4a63SDavid du Colombier q->state &= ~Qstarve;
6508ccd4a63SDavid du Colombier dowakeup = 1;
6518ccd4a63SDavid du Colombier }
6528ccd4a63SDavid du Colombier iunlock(&q->lk);
6538ccd4a63SDavid du Colombier
6548ccd4a63SDavid du Colombier if(dowakeup)
6558ccd4a63SDavid du Colombier wakeup(&q->rr);
6568ccd4a63SDavid du Colombier
6578ccd4a63SDavid du Colombier return len;
6588ccd4a63SDavid du Colombier }
6598ccd4a63SDavid du Colombier
6608ccd4a63SDavid du Colombier /*
6618ccd4a63SDavid du Colombier * if the allocated space is way out of line with the used
6628ccd4a63SDavid du Colombier * space, reallocate to a smaller block
6638ccd4a63SDavid du Colombier */
6648ccd4a63SDavid du Colombier Block*
packblock(Block * bp)6658ccd4a63SDavid du Colombier packblock(Block *bp)
6668ccd4a63SDavid du Colombier {
6678ccd4a63SDavid du Colombier Block **l, *nbp;
6688ccd4a63SDavid du Colombier int n;
6698ccd4a63SDavid du Colombier
6708ccd4a63SDavid du Colombier for(l = &bp; *l; l = &(*l)->next){
6718ccd4a63SDavid du Colombier nbp = *l;
6728ccd4a63SDavid du Colombier n = BLEN(nbp);
6738ccd4a63SDavid du Colombier if((n<<2) < BALLOC(nbp)){
6748ccd4a63SDavid du Colombier *l = allocb(n);
6758ccd4a63SDavid du Colombier memmove((*l)->wp, nbp->rp, n);
6768ccd4a63SDavid du Colombier (*l)->wp += n;
6778ccd4a63SDavid du Colombier (*l)->next = nbp->next;
6788ccd4a63SDavid du Colombier freeb(nbp);
6798ccd4a63SDavid du Colombier }
6808ccd4a63SDavid du Colombier }
6818ccd4a63SDavid du Colombier
6828ccd4a63SDavid du Colombier return bp;
6838ccd4a63SDavid du Colombier }
6848ccd4a63SDavid du Colombier
6858ccd4a63SDavid du Colombier int
qproduce(Queue * q,void * vp,int len)6868ccd4a63SDavid du Colombier qproduce(Queue *q, void *vp, int len)
6878ccd4a63SDavid du Colombier {
6888ccd4a63SDavid du Colombier Block *b;
6898ccd4a63SDavid du Colombier int dowakeup;
6908ccd4a63SDavid du Colombier uchar *p = vp;
6918ccd4a63SDavid du Colombier
6928ccd4a63SDavid du Colombier /* sync with qread */
6938ccd4a63SDavid du Colombier dowakeup = 0;
6948ccd4a63SDavid du Colombier ilock(&q->lk);
6958ccd4a63SDavid du Colombier
6968ccd4a63SDavid du Colombier /* no waiting receivers, room in buffer? */
6978ccd4a63SDavid du Colombier if(q->len >= q->limit){
6988ccd4a63SDavid du Colombier q->state |= Qflow;
6998ccd4a63SDavid du Colombier iunlock(&q->lk);
7008ccd4a63SDavid du Colombier return -1;
7018ccd4a63SDavid du Colombier }
7028ccd4a63SDavid du Colombier
7038ccd4a63SDavid du Colombier /* save in buffer */
7048ccd4a63SDavid du Colombier b = iallocb(len);
7058ccd4a63SDavid du Colombier if(b == 0){
7068ccd4a63SDavid du Colombier iunlock(&q->lk);
7078ccd4a63SDavid du Colombier return 0;
7088ccd4a63SDavid du Colombier }
7098ccd4a63SDavid du Colombier memmove(b->wp, p, len);
7108ccd4a63SDavid du Colombier producecnt += len;
7118ccd4a63SDavid du Colombier b->wp += len;
7128ccd4a63SDavid du Colombier if(q->bfirst)
7138ccd4a63SDavid du Colombier q->blast->next = b;
7148ccd4a63SDavid du Colombier else
7158ccd4a63SDavid du Colombier q->bfirst = b;
7168ccd4a63SDavid du Colombier q->blast = b;
7178ccd4a63SDavid du Colombier /* b->next = 0; done by iallocb() */
7188ccd4a63SDavid du Colombier q->len += BALLOC(b);
7198ccd4a63SDavid du Colombier q->dlen += BLEN(b);
7208ccd4a63SDavid du Colombier QDEBUG checkb(b, "qproduce");
7218ccd4a63SDavid du Colombier
7228ccd4a63SDavid du Colombier if(q->state & Qstarve){
7238ccd4a63SDavid du Colombier q->state &= ~Qstarve;
7248ccd4a63SDavid du Colombier dowakeup = 1;
7258ccd4a63SDavid du Colombier }
7268ccd4a63SDavid du Colombier
7278ccd4a63SDavid du Colombier if(q->len >= q->limit)
7288ccd4a63SDavid du Colombier q->state |= Qflow;
7298ccd4a63SDavid du Colombier iunlock(&q->lk);
7308ccd4a63SDavid du Colombier
7318ccd4a63SDavid du Colombier if(dowakeup)
7328ccd4a63SDavid du Colombier wakeup(&q->rr);
7338ccd4a63SDavid du Colombier
7348ccd4a63SDavid du Colombier return len;
7358ccd4a63SDavid du Colombier }
7368ccd4a63SDavid du Colombier
7378ccd4a63SDavid du Colombier /*
7388ccd4a63SDavid du Colombier * copy from offset in the queue
7398ccd4a63SDavid du Colombier */
7408ccd4a63SDavid du Colombier Block*
qcopy(Queue * q,int len,ulong offset)7418ccd4a63SDavid du Colombier qcopy(Queue *q, int len, ulong offset)
7428ccd4a63SDavid du Colombier {
7438ccd4a63SDavid du Colombier int sofar;
7448ccd4a63SDavid du Colombier int n;
7458ccd4a63SDavid du Colombier Block *b, *nb;
7468ccd4a63SDavid du Colombier uchar *p;
7478ccd4a63SDavid du Colombier
7488ccd4a63SDavid du Colombier nb = allocb(len);
7498ccd4a63SDavid du Colombier
7508ccd4a63SDavid du Colombier ilock(&q->lk);
7518ccd4a63SDavid du Colombier
7528ccd4a63SDavid du Colombier /* go to offset */
7538ccd4a63SDavid du Colombier b = q->bfirst;
7548ccd4a63SDavid du Colombier for(sofar = 0; ; sofar += n){
7558ccd4a63SDavid du Colombier if(b == nil){
7568ccd4a63SDavid du Colombier iunlock(&q->lk);
7578ccd4a63SDavid du Colombier return nb;
7588ccd4a63SDavid du Colombier }
7598ccd4a63SDavid du Colombier n = BLEN(b);
7608ccd4a63SDavid du Colombier if(sofar + n > offset){
7618ccd4a63SDavid du Colombier p = b->rp + offset - sofar;
7628ccd4a63SDavid du Colombier n -= offset - sofar;
7638ccd4a63SDavid du Colombier break;
7648ccd4a63SDavid du Colombier }
7658ccd4a63SDavid du Colombier QDEBUG checkb(b, "qcopy");
7668ccd4a63SDavid du Colombier b = b->next;
7678ccd4a63SDavid du Colombier }
7688ccd4a63SDavid du Colombier
7698ccd4a63SDavid du Colombier /* copy bytes from there */
7708ccd4a63SDavid du Colombier for(sofar = 0; sofar < len;){
7718ccd4a63SDavid du Colombier if(n > len - sofar)
7728ccd4a63SDavid du Colombier n = len - sofar;
7738ccd4a63SDavid du Colombier memmove(nb->wp, p, n);
7748ccd4a63SDavid du Colombier qcopycnt += n;
7758ccd4a63SDavid du Colombier sofar += n;
7768ccd4a63SDavid du Colombier nb->wp += n;
7778ccd4a63SDavid du Colombier b = b->next;
7788ccd4a63SDavid du Colombier if(b == nil)
7798ccd4a63SDavid du Colombier break;
7808ccd4a63SDavid du Colombier n = BLEN(b);
7818ccd4a63SDavid du Colombier p = b->rp;
7828ccd4a63SDavid du Colombier }
7838ccd4a63SDavid du Colombier iunlock(&q->lk);
7848ccd4a63SDavid du Colombier
7858ccd4a63SDavid du Colombier return nb;
7868ccd4a63SDavid du Colombier }
7878ccd4a63SDavid du Colombier
7888ccd4a63SDavid du Colombier /*
7898ccd4a63SDavid du Colombier * called by non-interrupt code
7908ccd4a63SDavid du Colombier */
7918ccd4a63SDavid du Colombier Queue*
qopen(int limit,int msg,void (* kick)(void *),void * arg)7928ccd4a63SDavid du Colombier qopen(int limit, int msg, void (*kick)(void*), void *arg)
7938ccd4a63SDavid du Colombier {
7948ccd4a63SDavid du Colombier Queue *q;
7958ccd4a63SDavid du Colombier
7968ccd4a63SDavid du Colombier q = malloc(sizeof(Queue));
7978ccd4a63SDavid du Colombier if(q == 0)
7988ccd4a63SDavid du Colombier return 0;
7998ccd4a63SDavid du Colombier
8008ccd4a63SDavid du Colombier q->limit = q->inilim = limit;
8018ccd4a63SDavid du Colombier q->kick = kick;
8028ccd4a63SDavid du Colombier q->arg = arg;
8038ccd4a63SDavid du Colombier q->state = msg;
8048ccd4a63SDavid du Colombier
8058ccd4a63SDavid du Colombier q->state |= Qstarve;
8068ccd4a63SDavid du Colombier q->eof = 0;
8078ccd4a63SDavid du Colombier q->noblock = 0;
8088ccd4a63SDavid du Colombier
8098ccd4a63SDavid du Colombier return q;
8108ccd4a63SDavid du Colombier }
8118ccd4a63SDavid du Colombier
8128ccd4a63SDavid du Colombier /* open a queue to be bypassed */
8138ccd4a63SDavid du Colombier Queue*
qbypass(void (* bypass)(void *,Block *),void * arg)8148ccd4a63SDavid du Colombier qbypass(void (*bypass)(void*, Block*), void *arg)
8158ccd4a63SDavid du Colombier {
8168ccd4a63SDavid du Colombier Queue *q;
8178ccd4a63SDavid du Colombier
8188ccd4a63SDavid du Colombier q = malloc(sizeof(Queue));
8198ccd4a63SDavid du Colombier if(q == 0)
8208ccd4a63SDavid du Colombier return 0;
8218ccd4a63SDavid du Colombier
8228ccd4a63SDavid du Colombier q->limit = 0;
8238ccd4a63SDavid du Colombier q->arg = arg;
8248ccd4a63SDavid du Colombier q->bypass = bypass;
8258ccd4a63SDavid du Colombier q->state = 0;
8268ccd4a63SDavid du Colombier
8278ccd4a63SDavid du Colombier return q;
8288ccd4a63SDavid du Colombier }
8298ccd4a63SDavid du Colombier
8308ccd4a63SDavid du Colombier static int
notempty(void * a)8318ccd4a63SDavid du Colombier notempty(void *a)
8328ccd4a63SDavid du Colombier {
8338ccd4a63SDavid du Colombier Queue *q = a;
8348ccd4a63SDavid du Colombier
8358ccd4a63SDavid du Colombier return (q->state & Qclosed) || q->bfirst != 0;
8368ccd4a63SDavid du Colombier }
8378ccd4a63SDavid du Colombier
8388ccd4a63SDavid du Colombier /*
8398ccd4a63SDavid du Colombier * wait for the queue to be non-empty or closed.
8408ccd4a63SDavid du Colombier * called with q ilocked.
8418ccd4a63SDavid du Colombier */
8428ccd4a63SDavid du Colombier static int
qwait(Queue * q)8438ccd4a63SDavid du Colombier qwait(Queue *q)
8448ccd4a63SDavid du Colombier {
8458ccd4a63SDavid du Colombier /* wait for data */
8468ccd4a63SDavid du Colombier for(;;){
8478ccd4a63SDavid du Colombier if(q->bfirst != nil)
8488ccd4a63SDavid du Colombier break;
8498ccd4a63SDavid du Colombier
8508ccd4a63SDavid du Colombier if(q->state & Qclosed){
8518ccd4a63SDavid du Colombier if(++q->eof > 3)
8528ccd4a63SDavid du Colombier return -1;
8538ccd4a63SDavid du Colombier if(*q->err && strcmp(q->err, Ehungup) != 0)
8548ccd4a63SDavid du Colombier return -1;
8558ccd4a63SDavid du Colombier return 0;
8568ccd4a63SDavid du Colombier }
8578ccd4a63SDavid du Colombier
8588ccd4a63SDavid du Colombier q->state |= Qstarve; /* flag requesting producer to wake me */
8598ccd4a63SDavid du Colombier iunlock(&q->lk);
8608ccd4a63SDavid du Colombier sleep(&q->rr, notempty, q);
8618ccd4a63SDavid du Colombier ilock(&q->lk);
8628ccd4a63SDavid du Colombier }
8638ccd4a63SDavid du Colombier return 1;
8648ccd4a63SDavid du Colombier }
8658ccd4a63SDavid du Colombier
8668ccd4a63SDavid du Colombier /*
8678ccd4a63SDavid du Colombier * add a block list to a queue
8688ccd4a63SDavid du Colombier */
8698ccd4a63SDavid du Colombier void
qaddlist(Queue * q,Block * b)8708ccd4a63SDavid du Colombier qaddlist(Queue *q, Block *b)
8718ccd4a63SDavid du Colombier {
8728ccd4a63SDavid du Colombier /* queue the block */
8738ccd4a63SDavid du Colombier if(q->bfirst)
8748ccd4a63SDavid du Colombier q->blast->next = b;
8758ccd4a63SDavid du Colombier else
8768ccd4a63SDavid du Colombier q->bfirst = b;
8778ccd4a63SDavid du Colombier q->len += blockalloclen(b);
8788ccd4a63SDavid du Colombier q->dlen += blocklen(b);
8798ccd4a63SDavid du Colombier while(b->next)
8808ccd4a63SDavid du Colombier b = b->next;
8818ccd4a63SDavid du Colombier q->blast = b;
8828ccd4a63SDavid du Colombier }
8838ccd4a63SDavid du Colombier
8848ccd4a63SDavid du Colombier /*
8858ccd4a63SDavid du Colombier * called with q ilocked
8868ccd4a63SDavid du Colombier */
8878ccd4a63SDavid du Colombier Block*
qremove(Queue * q)8888ccd4a63SDavid du Colombier qremove(Queue *q)
8898ccd4a63SDavid du Colombier {
8908ccd4a63SDavid du Colombier Block *b;
8918ccd4a63SDavid du Colombier
8928ccd4a63SDavid du Colombier b = q->bfirst;
8938ccd4a63SDavid du Colombier if(b == nil)
8948ccd4a63SDavid du Colombier return nil;
8958ccd4a63SDavid du Colombier q->bfirst = b->next;
8968ccd4a63SDavid du Colombier b->next = nil;
8978ccd4a63SDavid du Colombier q->dlen -= BLEN(b);
8988ccd4a63SDavid du Colombier q->len -= BALLOC(b);
8998ccd4a63SDavid du Colombier QDEBUG checkb(b, "qremove");
9008ccd4a63SDavid du Colombier return b;
9018ccd4a63SDavid du Colombier }
9028ccd4a63SDavid du Colombier
9038ccd4a63SDavid du Colombier /*
9048ccd4a63SDavid du Colombier * copy the contents of a string of blocks into
9058ccd4a63SDavid du Colombier * memory. emptied blocks are freed. return
9068ccd4a63SDavid du Colombier * pointer to first unconsumed block.
9078ccd4a63SDavid du Colombier */
9088ccd4a63SDavid du Colombier Block*
bl2mem(uchar * p,Block * b,int n)9098ccd4a63SDavid du Colombier bl2mem(uchar *p, Block *b, int n)
9108ccd4a63SDavid du Colombier {
9118ccd4a63SDavid du Colombier int i;
9128ccd4a63SDavid du Colombier Block *next;
9138ccd4a63SDavid du Colombier
9148ccd4a63SDavid du Colombier for(; b != nil; b = next){
9158ccd4a63SDavid du Colombier i = BLEN(b);
9168ccd4a63SDavid du Colombier if(i > n){
9178ccd4a63SDavid du Colombier memmove(p, b->rp, n);
9188ccd4a63SDavid du Colombier b->rp += n;
9198ccd4a63SDavid du Colombier return b;
9208ccd4a63SDavid du Colombier }
9218ccd4a63SDavid du Colombier memmove(p, b->rp, i);
9228ccd4a63SDavid du Colombier n -= i;
9238ccd4a63SDavid du Colombier p += i;
9248ccd4a63SDavid du Colombier b->rp += i;
9258ccd4a63SDavid du Colombier next = b->next;
9268ccd4a63SDavid du Colombier freeb(b);
9278ccd4a63SDavid du Colombier }
9288ccd4a63SDavid du Colombier return nil;
9298ccd4a63SDavid du Colombier }
9308ccd4a63SDavid du Colombier
9318ccd4a63SDavid du Colombier /*
9328ccd4a63SDavid du Colombier * copy the contents of memory into a string of blocks.
9338ccd4a63SDavid du Colombier * return nil on error.
9348ccd4a63SDavid du Colombier */
9358ccd4a63SDavid du Colombier Block*
mem2bl(uchar * p,int len)9368ccd4a63SDavid du Colombier mem2bl(uchar *p, int len)
9378ccd4a63SDavid du Colombier {
9388ccd4a63SDavid du Colombier int n;
9398ccd4a63SDavid du Colombier Block *b, *first, **l;
9408ccd4a63SDavid du Colombier
9418ccd4a63SDavid du Colombier first = nil;
9428ccd4a63SDavid du Colombier l = &first;
9438ccd4a63SDavid du Colombier if(waserror()){
9448ccd4a63SDavid du Colombier freeblist(first);
9458ccd4a63SDavid du Colombier nexterror();
9468ccd4a63SDavid du Colombier }
9478ccd4a63SDavid du Colombier do {
9488ccd4a63SDavid du Colombier n = len;
9498ccd4a63SDavid du Colombier if(n > Maxatomic)
9508ccd4a63SDavid du Colombier n = Maxatomic;
9518ccd4a63SDavid du Colombier
9528ccd4a63SDavid du Colombier *l = b = allocb(n);
9538ccd4a63SDavid du Colombier /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
9548ccd4a63SDavid du Colombier memmove(b->wp, p, n);
9558ccd4a63SDavid du Colombier b->wp += n;
9568ccd4a63SDavid du Colombier p += n;
9578ccd4a63SDavid du Colombier len -= n;
9588ccd4a63SDavid du Colombier l = &b->next;
9598ccd4a63SDavid du Colombier } while(len > 0);
9608ccd4a63SDavid du Colombier poperror();
9618ccd4a63SDavid du Colombier
9628ccd4a63SDavid du Colombier return first;
9638ccd4a63SDavid du Colombier }
9648ccd4a63SDavid du Colombier
9658ccd4a63SDavid du Colombier /*
9668ccd4a63SDavid du Colombier * put a block back to the front of the queue
9678ccd4a63SDavid du Colombier * called with q ilocked
9688ccd4a63SDavid du Colombier */
9698ccd4a63SDavid du Colombier void
qputback(Queue * q,Block * b)9708ccd4a63SDavid du Colombier qputback(Queue *q, Block *b)
9718ccd4a63SDavid du Colombier {
9728ccd4a63SDavid du Colombier b->next = q->bfirst;
9738ccd4a63SDavid du Colombier if(q->bfirst == nil)
9748ccd4a63SDavid du Colombier q->blast = b;
9758ccd4a63SDavid du Colombier q->bfirst = b;
9768ccd4a63SDavid du Colombier q->len += BALLOC(b);
9778ccd4a63SDavid du Colombier q->dlen += BLEN(b);
9788ccd4a63SDavid du Colombier }
9798ccd4a63SDavid du Colombier
9808ccd4a63SDavid du Colombier /*
9818ccd4a63SDavid du Colombier * flow control, get producer going again
9828ccd4a63SDavid du Colombier * called with q ilocked
9838ccd4a63SDavid du Colombier */
9848ccd4a63SDavid du Colombier static void
qwakeup_iunlock(Queue * q)9858ccd4a63SDavid du Colombier qwakeup_iunlock(Queue *q)
9868ccd4a63SDavid du Colombier {
9878ccd4a63SDavid du Colombier int dowakeup = 0;
9888ccd4a63SDavid du Colombier
9898ccd4a63SDavid du Colombier /* if writer flow controlled, restart */
9908ccd4a63SDavid du Colombier if((q->state & Qflow) && q->len < q->limit/2){
9918ccd4a63SDavid du Colombier q->state &= ~Qflow;
9928ccd4a63SDavid du Colombier dowakeup = 1;
9938ccd4a63SDavid du Colombier }
9948ccd4a63SDavid du Colombier
9958ccd4a63SDavid du Colombier iunlock(&q->lk);
9968ccd4a63SDavid du Colombier
9978ccd4a63SDavid du Colombier /* wakeup flow controlled writers */
9988ccd4a63SDavid du Colombier if(dowakeup){
9998ccd4a63SDavid du Colombier if(q->kick)
10008ccd4a63SDavid du Colombier q->kick(q->arg);
10018ccd4a63SDavid du Colombier wakeup(&q->wr);
10028ccd4a63SDavid du Colombier }
10038ccd4a63SDavid du Colombier }
10048ccd4a63SDavid du Colombier
10058ccd4a63SDavid du Colombier /*
10068ccd4a63SDavid du Colombier * get next block from a queue (up to a limit)
10078ccd4a63SDavid du Colombier */
10088ccd4a63SDavid du Colombier Block*
qbread(Queue * q,int len)10098ccd4a63SDavid du Colombier qbread(Queue *q, int len)
10108ccd4a63SDavid du Colombier {
10118ccd4a63SDavid du Colombier Block *b, *nb;
10128ccd4a63SDavid du Colombier int n;
10138ccd4a63SDavid du Colombier
10148ccd4a63SDavid du Colombier qlock(&q->rlock);
10158ccd4a63SDavid du Colombier if(waserror()){
10168ccd4a63SDavid du Colombier qunlock(&q->rlock);
10178ccd4a63SDavid du Colombier nexterror();
10188ccd4a63SDavid du Colombier }
10198ccd4a63SDavid du Colombier
10208ccd4a63SDavid du Colombier ilock(&q->lk);
10218ccd4a63SDavid du Colombier switch(qwait(q)){
10228ccd4a63SDavid du Colombier case 0:
10238ccd4a63SDavid du Colombier /* queue closed */
10248ccd4a63SDavid du Colombier iunlock(&q->lk);
10258ccd4a63SDavid du Colombier qunlock(&q->rlock);
10268ccd4a63SDavid du Colombier poperror();
10278ccd4a63SDavid du Colombier return nil;
10288ccd4a63SDavid du Colombier case -1:
10298ccd4a63SDavid du Colombier /* multiple reads on a closed queue */
10308ccd4a63SDavid du Colombier iunlock(&q->lk);
10318ccd4a63SDavid du Colombier error(q->err);
10328ccd4a63SDavid du Colombier }
10338ccd4a63SDavid du Colombier
10348ccd4a63SDavid du Colombier /* if we get here, there's at least one block in the queue */
10358ccd4a63SDavid du Colombier b = qremove(q);
10368ccd4a63SDavid du Colombier n = BLEN(b);
10378ccd4a63SDavid du Colombier
10388ccd4a63SDavid du Colombier /* split block if it's too big and this is not a message queue */
10398ccd4a63SDavid du Colombier nb = b;
10408ccd4a63SDavid du Colombier if(n > len){
10418ccd4a63SDavid du Colombier if((q->state&Qmsg) == 0){
10428ccd4a63SDavid du Colombier n -= len;
10438ccd4a63SDavid du Colombier b = allocb(n);
10448ccd4a63SDavid du Colombier memmove(b->wp, nb->rp+len, n);
10458ccd4a63SDavid du Colombier b->wp += n;
10468ccd4a63SDavid du Colombier qputback(q, b);
10478ccd4a63SDavid du Colombier }
10488ccd4a63SDavid du Colombier nb->wp = nb->rp + len;
10498ccd4a63SDavid du Colombier }
10508ccd4a63SDavid du Colombier
10518ccd4a63SDavid du Colombier /* restart producer */
10528ccd4a63SDavid du Colombier qwakeup_iunlock(q);
10538ccd4a63SDavid du Colombier
10548ccd4a63SDavid du Colombier poperror();
10558ccd4a63SDavid du Colombier qunlock(&q->rlock);
10568ccd4a63SDavid du Colombier return nb;
10578ccd4a63SDavid du Colombier }
10588ccd4a63SDavid du Colombier
10598ccd4a63SDavid du Colombier /*
10608ccd4a63SDavid du Colombier * read a queue. if no data is queued, post a Block
10618ccd4a63SDavid du Colombier * and wait on its Rendez.
10628ccd4a63SDavid du Colombier */
10638ccd4a63SDavid du Colombier long
qread(Queue * q,void * vp,int len)10648ccd4a63SDavid du Colombier qread(Queue *q, void *vp, int len)
10658ccd4a63SDavid du Colombier {
10668ccd4a63SDavid du Colombier Block *b, *first, **l;
10678ccd4a63SDavid du Colombier int m, n;
10688ccd4a63SDavid du Colombier
10698ccd4a63SDavid du Colombier qlock(&q->rlock);
10708ccd4a63SDavid du Colombier if(waserror()){
10718ccd4a63SDavid du Colombier qunlock(&q->rlock);
10728ccd4a63SDavid du Colombier nexterror();
10738ccd4a63SDavid du Colombier }
10748ccd4a63SDavid du Colombier
10758ccd4a63SDavid du Colombier ilock(&q->lk);
10768ccd4a63SDavid du Colombier again:
10778ccd4a63SDavid du Colombier switch(qwait(q)){
10788ccd4a63SDavid du Colombier case 0:
10798ccd4a63SDavid du Colombier /* queue closed */
10808ccd4a63SDavid du Colombier iunlock(&q->lk);
10818ccd4a63SDavid du Colombier qunlock(&q->rlock);
10828ccd4a63SDavid du Colombier poperror();
10838ccd4a63SDavid du Colombier return 0;
10848ccd4a63SDavid du Colombier case -1:
10858ccd4a63SDavid du Colombier /* multiple reads on a closed queue */
10868ccd4a63SDavid du Colombier iunlock(&q->lk);
10878ccd4a63SDavid du Colombier error(q->err);
10888ccd4a63SDavid du Colombier }
10898ccd4a63SDavid du Colombier
10908ccd4a63SDavid du Colombier /* if we get here, there's at least one block in the queue */
10918ccd4a63SDavid du Colombier if(q->state & Qcoalesce){
10928ccd4a63SDavid du Colombier /* when coalescing, 0 length blocks just go away */
10938ccd4a63SDavid du Colombier b = q->bfirst;
10948ccd4a63SDavid du Colombier if(BLEN(b) <= 0){
10958ccd4a63SDavid du Colombier freeb(qremove(q));
10968ccd4a63SDavid du Colombier goto again;
10978ccd4a63SDavid du Colombier }
10988ccd4a63SDavid du Colombier
10998ccd4a63SDavid du Colombier /* grab the first block plus as many
11008ccd4a63SDavid du Colombier * following blocks as will completely
11018ccd4a63SDavid du Colombier * fit in the read.
11028ccd4a63SDavid du Colombier */
11038ccd4a63SDavid du Colombier n = 0;
11048ccd4a63SDavid du Colombier l = &first;
11058ccd4a63SDavid du Colombier m = BLEN(b);
11068ccd4a63SDavid du Colombier for(;;) {
11078ccd4a63SDavid du Colombier *l = qremove(q);
11088ccd4a63SDavid du Colombier l = &b->next;
11098ccd4a63SDavid du Colombier n += m;
11108ccd4a63SDavid du Colombier
11118ccd4a63SDavid du Colombier b = q->bfirst;
11128ccd4a63SDavid du Colombier if(b == nil)
11138ccd4a63SDavid du Colombier break;
11148ccd4a63SDavid du Colombier m = BLEN(b);
11158ccd4a63SDavid du Colombier if(n+m > len)
11168ccd4a63SDavid du Colombier break;
11178ccd4a63SDavid du Colombier }
11188ccd4a63SDavid du Colombier } else {
11198ccd4a63SDavid du Colombier first = qremove(q);
11208ccd4a63SDavid du Colombier n = BLEN(first);
11218ccd4a63SDavid du Colombier }
11228ccd4a63SDavid du Colombier
11238ccd4a63SDavid du Colombier /* copy to user space outside of the ilock */
11248ccd4a63SDavid du Colombier iunlock(&q->lk);
11258ccd4a63SDavid du Colombier b = bl2mem(vp, first, len);
11268ccd4a63SDavid du Colombier ilock(&q->lk);
11278ccd4a63SDavid du Colombier
11288ccd4a63SDavid du Colombier /* take care of any left over partial block */
11298ccd4a63SDavid du Colombier if(b != nil){
11308ccd4a63SDavid du Colombier n -= BLEN(b);
11318ccd4a63SDavid du Colombier if(q->state & Qmsg)
11328ccd4a63SDavid du Colombier freeb(b);
11338ccd4a63SDavid du Colombier else
11348ccd4a63SDavid du Colombier qputback(q, b);
11358ccd4a63SDavid du Colombier }
11368ccd4a63SDavid du Colombier
11378ccd4a63SDavid du Colombier /* restart producer */
11388ccd4a63SDavid du Colombier qwakeup_iunlock(q);
11398ccd4a63SDavid du Colombier
11408ccd4a63SDavid du Colombier poperror();
11418ccd4a63SDavid du Colombier qunlock(&q->rlock);
11428ccd4a63SDavid du Colombier return n;
11438ccd4a63SDavid du Colombier }
11448ccd4a63SDavid du Colombier
11458ccd4a63SDavid du Colombier static int
qnotfull(void * a)11468ccd4a63SDavid du Colombier qnotfull(void *a)
11478ccd4a63SDavid du Colombier {
11488ccd4a63SDavid du Colombier Queue *q = a;
11498ccd4a63SDavid du Colombier
11508ccd4a63SDavid du Colombier return q->len < q->limit || (q->state & Qclosed);
11518ccd4a63SDavid du Colombier }
11528ccd4a63SDavid du Colombier
11538ccd4a63SDavid du Colombier ulong noblockcnt;
11548ccd4a63SDavid du Colombier
11558ccd4a63SDavid du Colombier /*
11568ccd4a63SDavid du Colombier * add a block to a queue obeying flow control
11578ccd4a63SDavid du Colombier */
11588ccd4a63SDavid du Colombier long
qbwrite(Queue * q,Block * b)11598ccd4a63SDavid du Colombier qbwrite(Queue *q, Block *b)
11608ccd4a63SDavid du Colombier {
11618ccd4a63SDavid du Colombier int n, dowakeup;
11628ccd4a63SDavid du Colombier
11638ccd4a63SDavid du Colombier n = BLEN(b);
11648ccd4a63SDavid du Colombier
11658ccd4a63SDavid du Colombier if(q->bypass){
11668ccd4a63SDavid du Colombier (*q->bypass)(q->arg, b);
11678ccd4a63SDavid du Colombier return n;
11688ccd4a63SDavid du Colombier }
11698ccd4a63SDavid du Colombier
11708ccd4a63SDavid du Colombier dowakeup = 0;
11718ccd4a63SDavid du Colombier qlock(&q->wlock);
11728ccd4a63SDavid du Colombier if(waserror()){
11738ccd4a63SDavid du Colombier if(b != nil)
11748ccd4a63SDavid du Colombier freeb(b);
11758ccd4a63SDavid du Colombier qunlock(&q->wlock);
11768ccd4a63SDavid du Colombier nexterror();
11778ccd4a63SDavid du Colombier }
11788ccd4a63SDavid du Colombier
11798ccd4a63SDavid du Colombier ilock(&q->lk);
11808ccd4a63SDavid du Colombier
11818ccd4a63SDavid du Colombier /* give up if the queue is closed */
11828ccd4a63SDavid du Colombier if(q->state & Qclosed){
11838ccd4a63SDavid du Colombier iunlock(&q->lk);
11848ccd4a63SDavid du Colombier error(q->err);
11858ccd4a63SDavid du Colombier }
11868ccd4a63SDavid du Colombier
11878ccd4a63SDavid du Colombier /* if nonblocking, don't queue over the limit */
11888ccd4a63SDavid du Colombier if(q->len >= q->limit){
11898ccd4a63SDavid du Colombier if(q->noblock){
11908ccd4a63SDavid du Colombier iunlock(&q->lk);
11918ccd4a63SDavid du Colombier freeb(b);
11928ccd4a63SDavid du Colombier noblockcnt += n;
11938ccd4a63SDavid du Colombier qunlock(&q->wlock);
11948ccd4a63SDavid du Colombier poperror();
11958ccd4a63SDavid du Colombier return n;
11968ccd4a63SDavid du Colombier }
11978ccd4a63SDavid du Colombier }
11988ccd4a63SDavid du Colombier
11998ccd4a63SDavid du Colombier /* queue the block */
12008ccd4a63SDavid du Colombier if(q->bfirst)
12018ccd4a63SDavid du Colombier q->blast->next = b;
12028ccd4a63SDavid du Colombier else
12038ccd4a63SDavid du Colombier q->bfirst = b;
12048ccd4a63SDavid du Colombier q->blast = b;
12058ccd4a63SDavid du Colombier b->next = 0;
12068ccd4a63SDavid du Colombier q->len += BALLOC(b);
12078ccd4a63SDavid du Colombier q->dlen += n;
12088ccd4a63SDavid du Colombier QDEBUG checkb(b, "qbwrite");
12098ccd4a63SDavid du Colombier b = nil;
12108ccd4a63SDavid du Colombier
12118ccd4a63SDavid du Colombier /* make sure other end gets awakened */
12128ccd4a63SDavid du Colombier if(q->state & Qstarve){
12138ccd4a63SDavid du Colombier q->state &= ~Qstarve;
12148ccd4a63SDavid du Colombier dowakeup = 1;
12158ccd4a63SDavid du Colombier }
12168ccd4a63SDavid du Colombier iunlock(&q->lk);
12178ccd4a63SDavid du Colombier
12188ccd4a63SDavid du Colombier /* get output going again */
12198ccd4a63SDavid du Colombier if(q->kick && (dowakeup || (q->state&Qkick)))
12208ccd4a63SDavid du Colombier q->kick(q->arg);
12218ccd4a63SDavid du Colombier
12228ccd4a63SDavid du Colombier /* wakeup anyone consuming at the other end */
12238ccd4a63SDavid du Colombier if(dowakeup){
12248ccd4a63SDavid du Colombier wakeup(&q->rr);
12258ccd4a63SDavid du Colombier
12268ccd4a63SDavid du Colombier /* if we just wokeup a higher priority process, let it run */
12278ccd4a63SDavid du Colombier /*
12288ccd4a63SDavid du Colombier p = wakeup(&q->rr);
12298ccd4a63SDavid du Colombier if(p != nil && p->priority > up->priority)
12308ccd4a63SDavid du Colombier sched();
12318ccd4a63SDavid du Colombier */
12328ccd4a63SDavid du Colombier }
12338ccd4a63SDavid du Colombier
12348ccd4a63SDavid du Colombier /*
12358ccd4a63SDavid du Colombier * flow control, wait for queue to get below the limit
12368ccd4a63SDavid du Colombier * before allowing the process to continue and queue
12378ccd4a63SDavid du Colombier * more. We do this here so that postnote can only
12388ccd4a63SDavid du Colombier * interrupt us after the data has been queued. This
12398ccd4a63SDavid du Colombier * means that things like 9p flushes and ssl messages
12408ccd4a63SDavid du Colombier * will not be disrupted by software interrupts.
12418ccd4a63SDavid du Colombier *
12428ccd4a63SDavid du Colombier * Note - this is moderately dangerous since a process
12438ccd4a63SDavid du Colombier * that keeps getting interrupted and rewriting will
12448ccd4a63SDavid du Colombier * queue infinite crud.
12458ccd4a63SDavid du Colombier */
12468ccd4a63SDavid du Colombier for(;;){
12478ccd4a63SDavid du Colombier if(q->noblock || qnotfull(q))
12488ccd4a63SDavid du Colombier break;
12498ccd4a63SDavid du Colombier
12508ccd4a63SDavid du Colombier ilock(&q->lk);
12518ccd4a63SDavid du Colombier q->state |= Qflow;
12528ccd4a63SDavid du Colombier iunlock(&q->lk);
12538ccd4a63SDavid du Colombier sleep(&q->wr, qnotfull, q);
12548ccd4a63SDavid du Colombier }
12558ccd4a63SDavid du Colombier USED(b);
12568ccd4a63SDavid du Colombier
12578ccd4a63SDavid du Colombier qunlock(&q->wlock);
12588ccd4a63SDavid du Colombier poperror();
12598ccd4a63SDavid du Colombier return n;
12608ccd4a63SDavid du Colombier }
12618ccd4a63SDavid du Colombier
12628ccd4a63SDavid du Colombier /*
12638ccd4a63SDavid du Colombier * write to a queue. only Maxatomic bytes at a time is atomic.
12648ccd4a63SDavid du Colombier */
12658ccd4a63SDavid du Colombier int
qwrite(Queue * q,void * vp,int len)12668ccd4a63SDavid du Colombier qwrite(Queue *q, void *vp, int len)
12678ccd4a63SDavid du Colombier {
12688ccd4a63SDavid du Colombier int n, sofar;
12698ccd4a63SDavid du Colombier Block *b;
12708ccd4a63SDavid du Colombier uchar *p = vp;
12718ccd4a63SDavid du Colombier
12728ccd4a63SDavid du Colombier QDEBUG if(!islo())
1273*ec59a3ddSDavid du Colombier print("qwrite hi %p\n", getcallerpc(&q));
12748ccd4a63SDavid du Colombier
12758ccd4a63SDavid du Colombier sofar = 0;
12768ccd4a63SDavid du Colombier do {
12778ccd4a63SDavid du Colombier n = len-sofar;
12788ccd4a63SDavid du Colombier if(n > Maxatomic)
12798ccd4a63SDavid du Colombier n = Maxatomic;
12808ccd4a63SDavid du Colombier
12818ccd4a63SDavid du Colombier b = allocb(n);
12828ccd4a63SDavid du Colombier /* setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); */
12838ccd4a63SDavid du Colombier if(waserror()){
12848ccd4a63SDavid du Colombier freeb(b);
12858ccd4a63SDavid du Colombier nexterror();
12868ccd4a63SDavid du Colombier }
12878ccd4a63SDavid du Colombier memmove(b->wp, p+sofar, n);
12888ccd4a63SDavid du Colombier poperror();
12898ccd4a63SDavid du Colombier b->wp += n;
12908ccd4a63SDavid du Colombier
12918ccd4a63SDavid du Colombier qbwrite(q, b);
12928ccd4a63SDavid du Colombier
12938ccd4a63SDavid du Colombier sofar += n;
12948ccd4a63SDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0);
12958ccd4a63SDavid du Colombier
12968ccd4a63SDavid du Colombier return len;
12978ccd4a63SDavid du Colombier }
12988ccd4a63SDavid du Colombier
12998ccd4a63SDavid du Colombier /*
13008ccd4a63SDavid du Colombier * used by print() to write to a queue. Since we may be splhi or not in
13018ccd4a63SDavid du Colombier * a process, don't qlock.
13028ccd4a63SDavid du Colombier */
13038ccd4a63SDavid du Colombier int
qiwrite(Queue * q,void * vp,int len)13048ccd4a63SDavid du Colombier qiwrite(Queue *q, void *vp, int len)
13058ccd4a63SDavid du Colombier {
13068ccd4a63SDavid du Colombier int n, sofar, dowakeup;
13078ccd4a63SDavid du Colombier Block *b;
13088ccd4a63SDavid du Colombier uchar *p = vp;
13098ccd4a63SDavid du Colombier
13108ccd4a63SDavid du Colombier dowakeup = 0;
13118ccd4a63SDavid du Colombier
13128ccd4a63SDavid du Colombier sofar = 0;
13138ccd4a63SDavid du Colombier do {
13148ccd4a63SDavid du Colombier n = len-sofar;
13158ccd4a63SDavid du Colombier if(n > Maxatomic)
13168ccd4a63SDavid du Colombier n = Maxatomic;
13178ccd4a63SDavid du Colombier
13188ccd4a63SDavid du Colombier b = iallocb(n);
13198ccd4a63SDavid du Colombier if(b == nil)
13208ccd4a63SDavid du Colombier break;
13218ccd4a63SDavid du Colombier memmove(b->wp, p+sofar, n);
13228ccd4a63SDavid du Colombier b->wp += n;
13238ccd4a63SDavid du Colombier
13248ccd4a63SDavid du Colombier ilock(&q->lk);
13258ccd4a63SDavid du Colombier
13268ccd4a63SDavid du Colombier QDEBUG checkb(b, "qiwrite");
13278ccd4a63SDavid du Colombier if(q->bfirst)
13288ccd4a63SDavid du Colombier q->blast->next = b;
13298ccd4a63SDavid du Colombier else
13308ccd4a63SDavid du Colombier q->bfirst = b;
13318ccd4a63SDavid du Colombier q->blast = b;
13328ccd4a63SDavid du Colombier q->len += BALLOC(b);
13338ccd4a63SDavid du Colombier q->dlen += n;
13348ccd4a63SDavid du Colombier
13358ccd4a63SDavid du Colombier if(q->state & Qstarve){
13368ccd4a63SDavid du Colombier q->state &= ~Qstarve;
13378ccd4a63SDavid du Colombier dowakeup = 1;
13388ccd4a63SDavid du Colombier }
13398ccd4a63SDavid du Colombier
13408ccd4a63SDavid du Colombier iunlock(&q->lk);
13418ccd4a63SDavid du Colombier
13428ccd4a63SDavid du Colombier if(dowakeup){
13438ccd4a63SDavid du Colombier if(q->kick)
13448ccd4a63SDavid du Colombier q->kick(q->arg);
13458ccd4a63SDavid du Colombier wakeup(&q->rr);
13468ccd4a63SDavid du Colombier }
13478ccd4a63SDavid du Colombier
13488ccd4a63SDavid du Colombier sofar += n;
13498ccd4a63SDavid du Colombier } while(sofar < len && (q->state & Qmsg) == 0);
13508ccd4a63SDavid du Colombier
13518ccd4a63SDavid du Colombier return sofar;
13528ccd4a63SDavid du Colombier }
13538ccd4a63SDavid du Colombier
13548ccd4a63SDavid du Colombier /*
13558ccd4a63SDavid du Colombier * be extremely careful when calling this,
13568ccd4a63SDavid du Colombier * as there is no reference accounting
13578ccd4a63SDavid du Colombier */
13588ccd4a63SDavid du Colombier void
qfree(Queue * q)13598ccd4a63SDavid du Colombier qfree(Queue *q)
13608ccd4a63SDavid du Colombier {
13618ccd4a63SDavid du Colombier qclose(q);
13628ccd4a63SDavid du Colombier free(q);
13638ccd4a63SDavid du Colombier }
13648ccd4a63SDavid du Colombier
13658ccd4a63SDavid du Colombier /*
13668ccd4a63SDavid du Colombier * Mark a queue as closed. No further IO is permitted.
13678ccd4a63SDavid du Colombier * All blocks are released.
13688ccd4a63SDavid du Colombier */
13698ccd4a63SDavid du Colombier void
qclose(Queue * q)13708ccd4a63SDavid du Colombier qclose(Queue *q)
13718ccd4a63SDavid du Colombier {
13728ccd4a63SDavid du Colombier Block *bfirst;
13738ccd4a63SDavid du Colombier
13748ccd4a63SDavid du Colombier if(q == nil)
13758ccd4a63SDavid du Colombier return;
13768ccd4a63SDavid du Colombier
13778ccd4a63SDavid du Colombier /* mark it */
13788ccd4a63SDavid du Colombier ilock(&q->lk);
13798ccd4a63SDavid du Colombier q->state |= Qclosed;
13808ccd4a63SDavid du Colombier q->state &= ~(Qflow|Qstarve);
13818ccd4a63SDavid du Colombier strcpy(q->err, Ehungup);
13828ccd4a63SDavid du Colombier bfirst = q->bfirst;
13838ccd4a63SDavid du Colombier q->bfirst = 0;
13848ccd4a63SDavid du Colombier q->len = 0;
13858ccd4a63SDavid du Colombier q->dlen = 0;
13868ccd4a63SDavid du Colombier q->noblock = 0;
13878ccd4a63SDavid du Colombier iunlock(&q->lk);
13888ccd4a63SDavid du Colombier
13898ccd4a63SDavid du Colombier /* free queued blocks */
13908ccd4a63SDavid du Colombier freeblist(bfirst);
13918ccd4a63SDavid du Colombier
13928ccd4a63SDavid du Colombier /* wake up readers/writers */
13938ccd4a63SDavid du Colombier wakeup(&q->rr);
13948ccd4a63SDavid du Colombier wakeup(&q->wr);
13958ccd4a63SDavid du Colombier }
13968ccd4a63SDavid du Colombier
13978ccd4a63SDavid du Colombier /*
13988ccd4a63SDavid du Colombier * Mark a queue as closed. Wakeup any readers. Don't remove queued
13998ccd4a63SDavid du Colombier * blocks.
14008ccd4a63SDavid du Colombier */
14018ccd4a63SDavid du Colombier void
qhangup(Queue * q,char * msg)14028ccd4a63SDavid du Colombier qhangup(Queue *q, char *msg)
14038ccd4a63SDavid du Colombier {
14048ccd4a63SDavid du Colombier /* mark it */
14058ccd4a63SDavid du Colombier ilock(&q->lk);
14068ccd4a63SDavid du Colombier q->state |= Qclosed;
14078ccd4a63SDavid du Colombier if(msg == 0 || *msg == 0)
14088ccd4a63SDavid du Colombier strcpy(q->err, Ehungup);
14098ccd4a63SDavid du Colombier else
14108ccd4a63SDavid du Colombier strncpy(q->err, msg, ERRMAX-1);
14118ccd4a63SDavid du Colombier iunlock(&q->lk);
14128ccd4a63SDavid du Colombier
14138ccd4a63SDavid du Colombier /* wake up readers/writers */
14148ccd4a63SDavid du Colombier wakeup(&q->rr);
14158ccd4a63SDavid du Colombier wakeup(&q->wr);
14168ccd4a63SDavid du Colombier }
14178ccd4a63SDavid du Colombier
14188ccd4a63SDavid du Colombier /*
14198ccd4a63SDavid du Colombier * return non-zero if the q is hungup
14208ccd4a63SDavid du Colombier */
14218ccd4a63SDavid du Colombier int
qisclosed(Queue * q)14228ccd4a63SDavid du Colombier qisclosed(Queue *q)
14238ccd4a63SDavid du Colombier {
14248ccd4a63SDavid du Colombier return q->state & Qclosed;
14258ccd4a63SDavid du Colombier }
14268ccd4a63SDavid du Colombier
14278ccd4a63SDavid du Colombier /*
14288ccd4a63SDavid du Colombier * mark a queue as no longer hung up
14298ccd4a63SDavid du Colombier */
14308ccd4a63SDavid du Colombier void
qreopen(Queue * q)14318ccd4a63SDavid du Colombier qreopen(Queue *q)
14328ccd4a63SDavid du Colombier {
14338ccd4a63SDavid du Colombier ilock(&q->lk);
14348ccd4a63SDavid du Colombier q->state &= ~Qclosed;
14358ccd4a63SDavid du Colombier q->state |= Qstarve;
14368ccd4a63SDavid du Colombier q->eof = 0;
14378ccd4a63SDavid du Colombier q->limit = q->inilim;
14388ccd4a63SDavid du Colombier iunlock(&q->lk);
14398ccd4a63SDavid du Colombier }
14408ccd4a63SDavid du Colombier
14418ccd4a63SDavid du Colombier /*
14428ccd4a63SDavid du Colombier * return bytes queued
14438ccd4a63SDavid du Colombier */
14448ccd4a63SDavid du Colombier int
qlen(Queue * q)14458ccd4a63SDavid du Colombier qlen(Queue *q)
14468ccd4a63SDavid du Colombier {
14478ccd4a63SDavid du Colombier return q->dlen;
14488ccd4a63SDavid du Colombier }
14498ccd4a63SDavid du Colombier
14508ccd4a63SDavid du Colombier /*
14518ccd4a63SDavid du Colombier * return space remaining before flow control
14528ccd4a63SDavid du Colombier */
14538ccd4a63SDavid du Colombier int
qwindow(Queue * q)14548ccd4a63SDavid du Colombier qwindow(Queue *q)
14558ccd4a63SDavid du Colombier {
14568ccd4a63SDavid du Colombier int l;
14578ccd4a63SDavid du Colombier
14588ccd4a63SDavid du Colombier l = q->limit - q->len;
14598ccd4a63SDavid du Colombier if(l < 0)
14608ccd4a63SDavid du Colombier l = 0;
14618ccd4a63SDavid du Colombier return l;
14628ccd4a63SDavid du Colombier }
14638ccd4a63SDavid du Colombier
14648ccd4a63SDavid du Colombier /*
14658ccd4a63SDavid du Colombier * return true if we can read without blocking
14668ccd4a63SDavid du Colombier */
14678ccd4a63SDavid du Colombier int
qcanread(Queue * q)14688ccd4a63SDavid du Colombier qcanread(Queue *q)
14698ccd4a63SDavid du Colombier {
14708ccd4a63SDavid du Colombier return q->bfirst!=0;
14718ccd4a63SDavid du Colombier }
14728ccd4a63SDavid du Colombier
14738ccd4a63SDavid du Colombier /*
14748ccd4a63SDavid du Colombier * change queue limit
14758ccd4a63SDavid du Colombier */
14768ccd4a63SDavid du Colombier void
qsetlimit(Queue * q,int limit)14778ccd4a63SDavid du Colombier qsetlimit(Queue *q, int limit)
14788ccd4a63SDavid du Colombier {
14798ccd4a63SDavid du Colombier q->limit = limit;
14808ccd4a63SDavid du Colombier }
14818ccd4a63SDavid du Colombier
14828ccd4a63SDavid du Colombier /*
14838ccd4a63SDavid du Colombier * set blocking/nonblocking
14848ccd4a63SDavid du Colombier */
14858ccd4a63SDavid du Colombier void
qnoblock(Queue * q,int onoff)14868ccd4a63SDavid du Colombier qnoblock(Queue *q, int onoff)
14878ccd4a63SDavid du Colombier {
14888ccd4a63SDavid du Colombier q->noblock = onoff;
14898ccd4a63SDavid du Colombier }
14908ccd4a63SDavid du Colombier
14918ccd4a63SDavid du Colombier /*
14928ccd4a63SDavid du Colombier * flush the output queue
14938ccd4a63SDavid du Colombier */
14948ccd4a63SDavid du Colombier void
qflush(Queue * q)14958ccd4a63SDavid du Colombier qflush(Queue *q)
14968ccd4a63SDavid du Colombier {
14978ccd4a63SDavid du Colombier Block *bfirst;
14988ccd4a63SDavid du Colombier
14998ccd4a63SDavid du Colombier /* mark it */
15008ccd4a63SDavid du Colombier ilock(&q->lk);
15018ccd4a63SDavid du Colombier bfirst = q->bfirst;
15028ccd4a63SDavid du Colombier q->bfirst = 0;
15038ccd4a63SDavid du Colombier q->len = 0;
15048ccd4a63SDavid du Colombier q->dlen = 0;
15058ccd4a63SDavid du Colombier iunlock(&q->lk);
15068ccd4a63SDavid du Colombier
15078ccd4a63SDavid du Colombier /* free queued blocks */
15088ccd4a63SDavid du Colombier freeblist(bfirst);
15098ccd4a63SDavid du Colombier
15108ccd4a63SDavid du Colombier /* wake up readers/writers */
15118ccd4a63SDavid du Colombier wakeup(&q->wr);
15128ccd4a63SDavid du Colombier }
15138ccd4a63SDavid du Colombier
15148ccd4a63SDavid du Colombier int
qfull(Queue * q)15158ccd4a63SDavid du Colombier qfull(Queue *q)
15168ccd4a63SDavid du Colombier {
15178ccd4a63SDavid du Colombier return q->state & Qflow;
15188ccd4a63SDavid du Colombier }
15198ccd4a63SDavid du Colombier
15208ccd4a63SDavid du Colombier int
qstate(Queue * q)15218ccd4a63SDavid du Colombier qstate(Queue *q)
15228ccd4a63SDavid du Colombier {
15238ccd4a63SDavid du Colombier return q->state;
15248ccd4a63SDavid du Colombier }
1525