xref: /plan9/sys/src/cmd/venti/srv/lumpqueue.c (revision 368c31ab13393dea083228fdd1c3445076f83a4b)
1*368c31abSDavid du Colombier #include "stdinc.h"
2*368c31abSDavid du Colombier #include "dat.h"
3*368c31abSDavid du Colombier #include "fns.h"
4*368c31abSDavid du Colombier 
5*368c31abSDavid du Colombier typedef struct LumpQueue	LumpQueue;
6*368c31abSDavid du Colombier typedef struct WLump		WLump;
7*368c31abSDavid du Colombier 
8*368c31abSDavid du Colombier enum
9*368c31abSDavid du Colombier {
10*368c31abSDavid du Colombier 	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
11*368c31abSDavid du Colombier };
12*368c31abSDavid du Colombier 
13*368c31abSDavid du Colombier struct WLump
14*368c31abSDavid du Colombier {
15*368c31abSDavid du Colombier 	Lump	*u;
16*368c31abSDavid du Colombier 	Packet	*p;
17*368c31abSDavid du Colombier 	int	creator;
18*368c31abSDavid du Colombier 	int	gen;
19*368c31abSDavid du Colombier 	uint	ms;
20*368c31abSDavid du Colombier };
21*368c31abSDavid du Colombier 
22*368c31abSDavid du Colombier struct LumpQueue
23*368c31abSDavid du Colombier {
24*368c31abSDavid du Colombier 	QLock	lock;
25*368c31abSDavid du Colombier 	Rendez 	flush;
26*368c31abSDavid du Colombier 	Rendez	full;
27*368c31abSDavid du Colombier 	Rendez	empty;
28*368c31abSDavid du Colombier 	WLump	q[MaxLumpQ];
29*368c31abSDavid du Colombier 	int	w;
30*368c31abSDavid du Colombier 	int	r;
31*368c31abSDavid du Colombier };
32*368c31abSDavid du Colombier 
33*368c31abSDavid du Colombier static LumpQueue	*lumpqs;
34*368c31abSDavid du Colombier static int		nqs;
35*368c31abSDavid du Colombier 
36*368c31abSDavid du Colombier static QLock		glk;
37*368c31abSDavid du Colombier static int		gen;
38*368c31abSDavid du Colombier 
39*368c31abSDavid du Colombier static void	queueproc(void *vq);
40*368c31abSDavid du Colombier 
41*368c31abSDavid du Colombier int
initlumpqueues(int nq)42*368c31abSDavid du Colombier initlumpqueues(int nq)
43*368c31abSDavid du Colombier {
44*368c31abSDavid du Colombier 	LumpQueue *q;
45*368c31abSDavid du Colombier 
46*368c31abSDavid du Colombier 	int i;
47*368c31abSDavid du Colombier 	nqs = nq;
48*368c31abSDavid du Colombier 
49*368c31abSDavid du Colombier 	lumpqs = MKNZ(LumpQueue, nq);
50*368c31abSDavid du Colombier 
51*368c31abSDavid du Colombier 	for(i = 0; i < nq; i++){
52*368c31abSDavid du Colombier 		q = &lumpqs[i];
53*368c31abSDavid du Colombier 		q->full.l = &q->lock;
54*368c31abSDavid du Colombier 		q->empty.l = &q->lock;
55*368c31abSDavid du Colombier 		q->flush.l = &q->lock;
56*368c31abSDavid du Colombier 
57*368c31abSDavid du Colombier 		if(vtproc(queueproc, q) < 0){
58*368c31abSDavid du Colombier 			seterr(EOk, "can't start write queue slave: %r");
59*368c31abSDavid du Colombier 			return -1;
60*368c31abSDavid du Colombier 		}
61*368c31abSDavid du Colombier 	}
62*368c31abSDavid du Colombier 
63*368c31abSDavid du Colombier 	return 0;
64*368c31abSDavid du Colombier }
65*368c31abSDavid du Colombier 
66*368c31abSDavid du Colombier /*
67*368c31abSDavid du Colombier  * queue a lump & it's packet data for writing
68*368c31abSDavid du Colombier  */
69*368c31abSDavid du Colombier int
queuewrite(Lump * u,Packet * p,int creator,uint ms)70*368c31abSDavid du Colombier queuewrite(Lump *u, Packet *p, int creator, uint ms)
71*368c31abSDavid du Colombier {
72*368c31abSDavid du Colombier 	LumpQueue *q;
73*368c31abSDavid du Colombier 	int i;
74*368c31abSDavid du Colombier 
75*368c31abSDavid du Colombier 	trace(TraceProc, "queuewrite");
76*368c31abSDavid du Colombier 	i = indexsect(mainindex, u->score);
77*368c31abSDavid du Colombier 	if(i < 0 || i >= nqs){
78*368c31abSDavid du Colombier 		seterr(EBug, "internal error: illegal index section in queuewrite");
79*368c31abSDavid du Colombier 		return -1;
80*368c31abSDavid du Colombier 	}
81*368c31abSDavid du Colombier 
82*368c31abSDavid du Colombier 	q = &lumpqs[i];
83*368c31abSDavid du Colombier 
84*368c31abSDavid du Colombier 	qlock(&q->lock);
85*368c31abSDavid du Colombier 	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
86*368c31abSDavid du Colombier 		trace(TraceProc, "queuewrite sleep");
87*368c31abSDavid du Colombier 		rsleep(&q->full);
88*368c31abSDavid du Colombier 	}
89*368c31abSDavid du Colombier 
90*368c31abSDavid du Colombier 	q->q[q->w].u = u;
91*368c31abSDavid du Colombier 	q->q[q->w].p = p;
92*368c31abSDavid du Colombier 	q->q[q->w].creator = creator;
93*368c31abSDavid du Colombier 	q->q[q->w].ms = ms;
94*368c31abSDavid du Colombier 	q->q[q->w].gen = gen;
95*368c31abSDavid du Colombier 	q->w = (q->w + 1) & (MaxLumpQ - 1);
96*368c31abSDavid du Colombier 
97*368c31abSDavid du Colombier 	trace(TraceProc, "queuewrite wakeup");
98*368c31abSDavid du Colombier 	rwakeup(&q->empty);
99*368c31abSDavid du Colombier 
100*368c31abSDavid du Colombier 	qunlock(&q->lock);
101*368c31abSDavid du Colombier 
102*368c31abSDavid du Colombier 	return 0;
103*368c31abSDavid du Colombier }
104*368c31abSDavid du Colombier 
105*368c31abSDavid du Colombier void
flushqueue(void)106*368c31abSDavid du Colombier flushqueue(void)
107*368c31abSDavid du Colombier {
108*368c31abSDavid du Colombier 	int i;
109*368c31abSDavid du Colombier 	LumpQueue *q;
110*368c31abSDavid du Colombier 
111*368c31abSDavid du Colombier 	if(!lumpqs)
112*368c31abSDavid du Colombier 		return;
113*368c31abSDavid du Colombier 
114*368c31abSDavid du Colombier 	trace(TraceProc, "flushqueue");
115*368c31abSDavid du Colombier 
116*368c31abSDavid du Colombier 	qlock(&glk);
117*368c31abSDavid du Colombier 	gen++;
118*368c31abSDavid du Colombier 	qunlock(&glk);
119*368c31abSDavid du Colombier 
120*368c31abSDavid du Colombier 	for(i=0; i<mainindex->nsects; i++){
121*368c31abSDavid du Colombier 		q = &lumpqs[i];
122*368c31abSDavid du Colombier 		qlock(&q->lock);
123*368c31abSDavid du Colombier 		while(q->w != q->r && gen - q->q[q->r].gen > 0){
124*368c31abSDavid du Colombier 			trace(TraceProc, "flushqueue sleep q%d", i);
125*368c31abSDavid du Colombier 			rsleep(&q->flush);
126*368c31abSDavid du Colombier 		}
127*368c31abSDavid du Colombier 		qunlock(&q->lock);
128*368c31abSDavid du Colombier 	}
129*368c31abSDavid du Colombier }
130*368c31abSDavid du Colombier 
131*368c31abSDavid du Colombier static void
queueproc(void * vq)132*368c31abSDavid du Colombier queueproc(void *vq)
133*368c31abSDavid du Colombier {
134*368c31abSDavid du Colombier 	LumpQueue *q;
135*368c31abSDavid du Colombier 	Lump *u;
136*368c31abSDavid du Colombier 	Packet *p;
137*368c31abSDavid du Colombier 	int creator;
138*368c31abSDavid du Colombier 	uint ms;
139*368c31abSDavid du Colombier 
140*368c31abSDavid du Colombier 	threadsetname("queueproc");
141*368c31abSDavid du Colombier 
142*368c31abSDavid du Colombier 	q = vq;
143*368c31abSDavid du Colombier 	for(;;){
144*368c31abSDavid du Colombier 		qlock(&q->lock);
145*368c31abSDavid du Colombier 		while(q->w == q->r){
146*368c31abSDavid du Colombier 			trace(TraceProc, "queueproc sleep empty");
147*368c31abSDavid du Colombier 			rsleep(&q->empty);
148*368c31abSDavid du Colombier 		}
149*368c31abSDavid du Colombier 
150*368c31abSDavid du Colombier 		u = q->q[q->r].u;
151*368c31abSDavid du Colombier 		p = q->q[q->r].p;
152*368c31abSDavid du Colombier 		creator = q->q[q->r].creator;
153*368c31abSDavid du Colombier 		ms = q->q[q->r].ms;
154*368c31abSDavid du Colombier 
155*368c31abSDavid du Colombier 		q->r = (q->r + 1) & (MaxLumpQ - 1);
156*368c31abSDavid du Colombier 		trace(TraceProc, "queueproc wakeup flush");
157*368c31abSDavid du Colombier 		rwakeupall(&q->flush);
158*368c31abSDavid du Colombier 
159*368c31abSDavid du Colombier 		trace(TraceProc, "queueproc wakeup full");
160*368c31abSDavid du Colombier 		rwakeup(&q->full);
161*368c31abSDavid du Colombier 
162*368c31abSDavid du Colombier 		qunlock(&q->lock);
163*368c31abSDavid du Colombier 
164*368c31abSDavid du Colombier 		trace(TraceProc, "queueproc writelump %V", u->score);
165*368c31abSDavid du Colombier 		if(writeqlump(u, p, creator, ms) < 0)
166*368c31abSDavid du Colombier 			fprint(2, "failed to write lump for %V: %r", u->score);
167*368c31abSDavid du Colombier 		trace(TraceProc, "queueproc wrotelump %V", u->score);
168*368c31abSDavid du Colombier 
169*368c31abSDavid du Colombier 		putlump(u);
170*368c31abSDavid du Colombier 	}
171*368c31abSDavid du Colombier }
172