1*368c31abSDavid du Colombier #include "stdinc.h"
2*368c31abSDavid du Colombier #include "dat.h"
3*368c31abSDavid du Colombier #include "fns.h"
4*368c31abSDavid du Colombier
5*368c31abSDavid du Colombier typedef struct LumpQueue LumpQueue;
6*368c31abSDavid du Colombier typedef struct WLump WLump;
7*368c31abSDavid du Colombier
8*368c31abSDavid du Colombier enum
9*368c31abSDavid du Colombier {
10*368c31abSDavid du Colombier MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */
11*368c31abSDavid du Colombier };
12*368c31abSDavid du Colombier
13*368c31abSDavid du Colombier struct WLump
14*368c31abSDavid du Colombier {
15*368c31abSDavid du Colombier Lump *u;
16*368c31abSDavid du Colombier Packet *p;
17*368c31abSDavid du Colombier int creator;
18*368c31abSDavid du Colombier int gen;
19*368c31abSDavid du Colombier uint ms;
20*368c31abSDavid du Colombier };
21*368c31abSDavid du Colombier
22*368c31abSDavid du Colombier struct LumpQueue
23*368c31abSDavid du Colombier {
24*368c31abSDavid du Colombier QLock lock;
25*368c31abSDavid du Colombier Rendez flush;
26*368c31abSDavid du Colombier Rendez full;
27*368c31abSDavid du Colombier Rendez empty;
28*368c31abSDavid du Colombier WLump q[MaxLumpQ];
29*368c31abSDavid du Colombier int w;
30*368c31abSDavid du Colombier int r;
31*368c31abSDavid du Colombier };
32*368c31abSDavid du Colombier
33*368c31abSDavid du Colombier static LumpQueue *lumpqs;
34*368c31abSDavid du Colombier static int nqs;
35*368c31abSDavid du Colombier
36*368c31abSDavid du Colombier static QLock glk;
37*368c31abSDavid du Colombier static int gen;
38*368c31abSDavid du Colombier
39*368c31abSDavid du Colombier static void queueproc(void *vq);
40*368c31abSDavid du Colombier
41*368c31abSDavid du Colombier int
initlumpqueues(int nq)42*368c31abSDavid du Colombier initlumpqueues(int nq)
43*368c31abSDavid du Colombier {
44*368c31abSDavid du Colombier LumpQueue *q;
45*368c31abSDavid du Colombier
46*368c31abSDavid du Colombier int i;
47*368c31abSDavid du Colombier nqs = nq;
48*368c31abSDavid du Colombier
49*368c31abSDavid du Colombier lumpqs = MKNZ(LumpQueue, nq);
50*368c31abSDavid du Colombier
51*368c31abSDavid du Colombier for(i = 0; i < nq; i++){
52*368c31abSDavid du Colombier q = &lumpqs[i];
53*368c31abSDavid du Colombier q->full.l = &q->lock;
54*368c31abSDavid du Colombier q->empty.l = &q->lock;
55*368c31abSDavid du Colombier q->flush.l = &q->lock;
56*368c31abSDavid du Colombier
57*368c31abSDavid du Colombier if(vtproc(queueproc, q) < 0){
58*368c31abSDavid du Colombier seterr(EOk, "can't start write queue slave: %r");
59*368c31abSDavid du Colombier return -1;
60*368c31abSDavid du Colombier }
61*368c31abSDavid du Colombier }
62*368c31abSDavid du Colombier
63*368c31abSDavid du Colombier return 0;
64*368c31abSDavid du Colombier }
65*368c31abSDavid du Colombier
66*368c31abSDavid du Colombier /*
67*368c31abSDavid du Colombier * queue a lump & it's packet data for writing
68*368c31abSDavid du Colombier */
69*368c31abSDavid du Colombier int
queuewrite(Lump * u,Packet * p,int creator,uint ms)70*368c31abSDavid du Colombier queuewrite(Lump *u, Packet *p, int creator, uint ms)
71*368c31abSDavid du Colombier {
72*368c31abSDavid du Colombier LumpQueue *q;
73*368c31abSDavid du Colombier int i;
74*368c31abSDavid du Colombier
75*368c31abSDavid du Colombier trace(TraceProc, "queuewrite");
76*368c31abSDavid du Colombier i = indexsect(mainindex, u->score);
77*368c31abSDavid du Colombier if(i < 0 || i >= nqs){
78*368c31abSDavid du Colombier seterr(EBug, "internal error: illegal index section in queuewrite");
79*368c31abSDavid du Colombier return -1;
80*368c31abSDavid du Colombier }
81*368c31abSDavid du Colombier
82*368c31abSDavid du Colombier q = &lumpqs[i];
83*368c31abSDavid du Colombier
84*368c31abSDavid du Colombier qlock(&q->lock);
85*368c31abSDavid du Colombier while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
86*368c31abSDavid du Colombier trace(TraceProc, "queuewrite sleep");
87*368c31abSDavid du Colombier rsleep(&q->full);
88*368c31abSDavid du Colombier }
89*368c31abSDavid du Colombier
90*368c31abSDavid du Colombier q->q[q->w].u = u;
91*368c31abSDavid du Colombier q->q[q->w].p = p;
92*368c31abSDavid du Colombier q->q[q->w].creator = creator;
93*368c31abSDavid du Colombier q->q[q->w].ms = ms;
94*368c31abSDavid du Colombier q->q[q->w].gen = gen;
95*368c31abSDavid du Colombier q->w = (q->w + 1) & (MaxLumpQ - 1);
96*368c31abSDavid du Colombier
97*368c31abSDavid du Colombier trace(TraceProc, "queuewrite wakeup");
98*368c31abSDavid du Colombier rwakeup(&q->empty);
99*368c31abSDavid du Colombier
100*368c31abSDavid du Colombier qunlock(&q->lock);
101*368c31abSDavid du Colombier
102*368c31abSDavid du Colombier return 0;
103*368c31abSDavid du Colombier }
104*368c31abSDavid du Colombier
105*368c31abSDavid du Colombier void
flushqueue(void)106*368c31abSDavid du Colombier flushqueue(void)
107*368c31abSDavid du Colombier {
108*368c31abSDavid du Colombier int i;
109*368c31abSDavid du Colombier LumpQueue *q;
110*368c31abSDavid du Colombier
111*368c31abSDavid du Colombier if(!lumpqs)
112*368c31abSDavid du Colombier return;
113*368c31abSDavid du Colombier
114*368c31abSDavid du Colombier trace(TraceProc, "flushqueue");
115*368c31abSDavid du Colombier
116*368c31abSDavid du Colombier qlock(&glk);
117*368c31abSDavid du Colombier gen++;
118*368c31abSDavid du Colombier qunlock(&glk);
119*368c31abSDavid du Colombier
120*368c31abSDavid du Colombier for(i=0; i<mainindex->nsects; i++){
121*368c31abSDavid du Colombier q = &lumpqs[i];
122*368c31abSDavid du Colombier qlock(&q->lock);
123*368c31abSDavid du Colombier while(q->w != q->r && gen - q->q[q->r].gen > 0){
124*368c31abSDavid du Colombier trace(TraceProc, "flushqueue sleep q%d", i);
125*368c31abSDavid du Colombier rsleep(&q->flush);
126*368c31abSDavid du Colombier }
127*368c31abSDavid du Colombier qunlock(&q->lock);
128*368c31abSDavid du Colombier }
129*368c31abSDavid du Colombier }
130*368c31abSDavid du Colombier
131*368c31abSDavid du Colombier static void
queueproc(void * vq)132*368c31abSDavid du Colombier queueproc(void *vq)
133*368c31abSDavid du Colombier {
134*368c31abSDavid du Colombier LumpQueue *q;
135*368c31abSDavid du Colombier Lump *u;
136*368c31abSDavid du Colombier Packet *p;
137*368c31abSDavid du Colombier int creator;
138*368c31abSDavid du Colombier uint ms;
139*368c31abSDavid du Colombier
140*368c31abSDavid du Colombier threadsetname("queueproc");
141*368c31abSDavid du Colombier
142*368c31abSDavid du Colombier q = vq;
143*368c31abSDavid du Colombier for(;;){
144*368c31abSDavid du Colombier qlock(&q->lock);
145*368c31abSDavid du Colombier while(q->w == q->r){
146*368c31abSDavid du Colombier trace(TraceProc, "queueproc sleep empty");
147*368c31abSDavid du Colombier rsleep(&q->empty);
148*368c31abSDavid du Colombier }
149*368c31abSDavid du Colombier
150*368c31abSDavid du Colombier u = q->q[q->r].u;
151*368c31abSDavid du Colombier p = q->q[q->r].p;
152*368c31abSDavid du Colombier creator = q->q[q->r].creator;
153*368c31abSDavid du Colombier ms = q->q[q->r].ms;
154*368c31abSDavid du Colombier
155*368c31abSDavid du Colombier q->r = (q->r + 1) & (MaxLumpQ - 1);
156*368c31abSDavid du Colombier trace(TraceProc, "queueproc wakeup flush");
157*368c31abSDavid du Colombier rwakeupall(&q->flush);
158*368c31abSDavid du Colombier
159*368c31abSDavid du Colombier trace(TraceProc, "queueproc wakeup full");
160*368c31abSDavid du Colombier rwakeup(&q->full);
161*368c31abSDavid du Colombier
162*368c31abSDavid du Colombier qunlock(&q->lock);
163*368c31abSDavid du Colombier
164*368c31abSDavid du Colombier trace(TraceProc, "queueproc writelump %V", u->score);
165*368c31abSDavid du Colombier if(writeqlump(u, p, creator, ms) < 0)
166*368c31abSDavid du Colombier fprint(2, "failed to write lump for %V: %r", u->score);
167*368c31abSDavid du Colombier trace(TraceProc, "queueproc wrotelump %V", u->score);
168*368c31abSDavid du Colombier
169*368c31abSDavid du Colombier putlump(u);
170*368c31abSDavid du Colombier }
171*368c31abSDavid du Colombier }
172