xref: /plan9/sys/src/cmd/venti/srv/venti.c (revision 36066be07288900642605e3e4f42ceece4e473a9)
1 #ifdef PLAN9PORT
2 #include <u.h>
3 #include <signal.h>
4 #endif
5 #include "stdinc.h"
6 #include <bio.h>
7 #include "dat.h"
8 #include "fns.h"
9 
10 #include "whack.h"
11 
12 typedef struct Allocs Allocs;
13 struct Allocs {
14 	u32int	mem;
15 	u32int	bcmem;
16 	u32int	icmem;
17 	u32int	stfree;				/* free memory at start */
18 	uint	mempcnt;
19 };
20 
21 int debug;
22 int nofork;
23 int mainstacksize = 256*1024;
24 VtSrv *ventisrv;
25 
26 static void	ventiserver(void*);
27 
28 static ulong
freemem(void)29 freemem(void)
30 {
31 	int nf, pgsize = 0;
32 	uvlong size, userpgs = 0, userused = 0;
33 	char *ln, *sl;
34 	char *fields[2];
35 	Biobuf *bp;
36 
37 	size = 64*1024*1024;
38 	bp = Bopen("#c/swap", OREAD);
39 	if (bp != nil) {
40 		while ((ln = Brdline(bp, '\n')) != nil) {
41 			ln[Blinelen(bp)-1] = '\0';
42 			nf = tokenize(ln, fields, nelem(fields));
43 			if (nf != 2)
44 				continue;
45 			if (strcmp(fields[1], "pagesize") == 0)
46 				pgsize = atoi(fields[0]);
47 			else if (strcmp(fields[1], "user") == 0) {
48 				sl = strchr(fields[0], '/');
49 				if (sl == nil)
50 					continue;
51 				userpgs = atoll(sl+1);
52 				userused = atoll(fields[0]);
53 			}
54 		}
55 		Bterm(bp);
56 		if (pgsize > 0 && userpgs > 0 && userused > 0)
57 			size = (userpgs - userused) * pgsize;
58 	}
59 	/* cap it to keep the size within 32 bits */
60 	if (size >= 3840UL * 1024 * 1024)
61 		size = 3840UL * 1024 * 1024;
62 	return size;
63 }
64 
65 static void
allocminima(Allocs * all)66 allocminima(Allocs *all)			/* enforce minima for sanity */
67 {
68 	if (all->icmem < 6 * 1024 * 1024)
69 		all->icmem = 6 * 1024 * 1024;
70 	if (all->mem < 1024 * 1024 || all->mem == Unspecified)  /* lumps */
71 		all->mem = 1024 * 1024;
72 	if (all->bcmem < 2 * 1024 * 1024)
73 		all->bcmem = 2 * 1024 * 1024;
74 }
75 
76 /* automatic memory allocations sizing per venti(8) guidelines */
77 static Allocs
allocbypcnt(u32int mempcnt,u32int stfree)78 allocbypcnt(u32int mempcnt, u32int stfree)
79 {
80 	u32int avail;
81 	vlong blmsize;
82 	Allocs all;
83 	static u32int free;
84 
85 	all.mem = Unspecified;
86 	all.bcmem = all.icmem = 0;
87 	all.mempcnt = mempcnt;
88 	all.stfree = stfree;
89 
90 	if (free == 0)
91 		free = freemem();
92 	blmsize = stfree - free;
93 	if (blmsize <= 0)
94 		blmsize = 0;
95 	avail = ((vlong)stfree * mempcnt) / 100;
96 	if (blmsize >= avail || (avail -= blmsize) <= (1 + 2 + 6) * 1024 * 1024)
97 		fprint(2, "%s: bloom filter bigger than mem pcnt; "
98 			"resorting to minimum values (9MB total)\n", argv0);
99 	else {
100 		if (avail >= 3840UL * 1024 * 1024)
101 			avail = 3840UL * 1024 * 1024;	/* sanity */
102 		avail /= 2;
103 		all.icmem = avail;
104 		avail /= 3;
105 		all.mem = avail;
106 		all.bcmem = 2 * avail;
107 	}
108 	return all;
109 }
110 
111 /*
112  * we compute default values for allocations,
113  * which can be overridden by (in order):
114  *	configuration file parameters,
115  *	command-line options other than -m, and -m.
116  */
117 static Allocs
sizeallocs(Allocs opt,Config * cfg)118 sizeallocs(Allocs opt, Config *cfg)
119 {
120 	Allocs all;
121 
122 	/* work out sane defaults */
123 	all = allocbypcnt(20, opt.stfree);
124 
125 	/* config file parameters override */
126 	if (cfg->mem && cfg->mem != Unspecified)
127 		all.mem = cfg->mem;
128 	if (cfg->bcmem)
129 		all.bcmem = cfg->bcmem;
130 	if (cfg->icmem)
131 		all.icmem = cfg->icmem;
132 
133 	/* command-line options override */
134 	if (opt.mem && opt.mem != Unspecified)
135 		all.mem = opt.mem;
136 	if (opt.bcmem)
137 		all.bcmem = opt.bcmem;
138 	if (opt.icmem)
139 		all.icmem = opt.icmem;
140 
141 	/* automatic memory sizing? */
142 	if(opt.mempcnt > 0)
143 		all = allocbypcnt(opt.mempcnt, opt.stfree);
144 
145 	allocminima(&all);
146 	return all;
147 }
148 
149 void
usage(void)150 usage(void)
151 {
152 	fprint(2, "usage: venti [-Ldrsw] [-a ventiaddr] [-c config] "
153 "[-h httpaddr] [-m %%mem] [-B blockcachesize] [-C cachesize] [-I icachesize] "
154 "[-W webroot]\n");
155 	threadexitsall("usage");
156 }
157 
158 void
threadmain(int argc,char * argv[])159 threadmain(int argc, char *argv[])
160 {
161 	char *configfile, *haddr, *vaddr, *webroot;
162 	u32int mem, icmem, bcmem, minbcmem, mempcnt, stfree;
163 	Allocs allocs;
164 	Config config;
165 
166 	traceinit();
167 	threadsetname("main");
168 	mempcnt = 0;
169 	vaddr = nil;
170 	haddr = nil;
171 	configfile = nil;
172 	webroot = nil;
173 	mem = Unspecified;
174 	icmem = 0;
175 	bcmem = 0;
176 	ARGBEGIN{
177 	case 'a':
178 		vaddr = EARGF(usage());
179 		break;
180 	case 'B':
181 		bcmem = unittoull(EARGF(usage()));
182 		break;
183 	case 'c':
184 		configfile = EARGF(usage());
185 		break;
186 	case 'C':
187 		mem = unittoull(EARGF(usage()));
188 		break;
189 	case 'D':
190 		settrace(EARGF(usage()));
191 		break;
192 	case 'd':
193 		debug = 1;
194 		nofork = 1;
195 		break;
196 	case 'h':
197 		haddr = EARGF(usage());
198 		break;
199 	case 'm':
200 		mempcnt = atoi(EARGF(usage()));
201 		if (mempcnt <= 0 || mempcnt >= 100)
202 			usage();
203 		break;
204 	case 'I':
205 		icmem = unittoull(EARGF(usage()));
206 		break;
207 	case 'L':
208 		ventilogging = 1;
209 		break;
210 	case 'r':
211 		readonly = 1;
212 		break;
213 	case 's':
214 		nofork = 1;
215 		break;
216 	case 'w':			/* compatibility with old venti */
217 		queuewrites = 1;
218 		break;
219 	case 'W':
220 		webroot = EARGF(usage());
221 		break;
222 	default:
223 		usage();
224 	}ARGEND
225 
226 	if(argc)
227 		usage();
228 
229 	if(!nofork)
230 		rfork(RFNOTEG);
231 
232 #ifdef PLAN9PORT
233 	{
234 		/* sigh - needed to avoid signals when writing to hungup networks */
235 		struct sigaction sa;
236 		memset(&sa, 0, sizeof sa);
237 		sa.sa_handler = SIG_IGN;
238 		sigaction(SIGPIPE, &sa, nil);
239 	}
240 #endif
241 
242 	ventifmtinstall();
243 	trace(TraceQuiet, "venti started");
244 	fprint(2, "%T venti: ");
245 
246 	if(configfile == nil)
247 		configfile = "venti.conf";
248 
249 	/* remember free memory before initventi & loadbloom, for auto-sizing */
250 	stfree = freemem();
251 	fprint(2, "conf...");
252 	if(initventi(configfile, &config) < 0)
253 		sysfatal("can't init server: %r");
254 	/*
255 	 * load bloom filter
256 	 */
257 	if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
258 		sysfatal("can't load bloom filter: %r");
259 
260 	/*
261 	 * size memory allocations; assumes bloom filter is loaded
262 	 */
263 	allocs = sizeallocs((Allocs){mem, bcmem, icmem, stfree, mempcnt},
264 		&config);
265 	mem = allocs.mem;
266 	bcmem = allocs.bcmem;
267 	icmem = allocs.icmem;
268 	fprint(2, "%s: mem %,ud bcmem %,ud icmem %,ud...",
269 		argv0, mem, bcmem, icmem);
270 
271 	/*
272 	 * default other configuration-file parameters
273 	 */
274 	if(haddr == nil)
275 		haddr = config.haddr;
276 	if(vaddr == nil)
277 		vaddr = config.vaddr;
278 	if(vaddr == nil)
279 		vaddr = "tcp!*!venti";
280 	if(webroot == nil)
281 		webroot = config.webroot;
282 	if(queuewrites == 0)
283 		queuewrites = config.queuewrites;
284 
285 	if(haddr){
286 		fprint(2, "httpd %s...", haddr);
287 		if(httpdinit(haddr, webroot) < 0)
288 			fprint(2, "warning: can't start http server: %r");
289 	}
290 	fprint(2, "init...");
291 
292 	/*
293 	 * lump cache
294 	 */
295 	if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
296 		mem, mem / (8 * 1024));
297 	initlumpcache(mem, mem / (8 * 1024));
298 
299 	/*
300 	 * index cache
301 	 */
302 	initicache(icmem);
303 	initicachewrite();
304 
305 	/*
306 	 * block cache: need a block for every arena and every process
307 	 */
308 	minbcmem = maxblocksize *
309 		(mainindex->narenas + mainindex->nsects*4 + 16);
310 	if(bcmem < minbcmem)
311 		bcmem = minbcmem;
312 	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
313 	initdcache(bcmem);
314 
315 	if(mainindex->bloom)
316 		startbloomproc(mainindex->bloom);
317 
318 	fprint(2, "sync...");
319 	if(!readonly && syncindex(mainindex) < 0)
320 		sysfatal("can't sync server: %r");
321 
322 	if(!readonly && queuewrites){
323 		fprint(2, "queue...");
324 		if(initlumpqueues(mainindex->nsects) < 0){
325 			fprint(2, "can't initialize lump queues,"
326 				" disabling write queueing: %r");
327 			queuewrites = 0;
328 		}
329 	}
330 
331 	if(initarenasum() < 0)
332 		fprint(2, "warning: can't initialize arena summing process: %r");
333 
334 	fprint(2, "announce %s...", vaddr);
335 	ventisrv = vtlisten(vaddr);
336 	if(ventisrv == nil)
337 		sysfatal("can't announce %s: %r", vaddr);
338 
339 	fprint(2, "serving.\n");
340 	if(nofork)
341 		ventiserver(nil);
342 	else
343 		vtproc(ventiserver, nil);
344 
345 	threadexits(nil);
346 }
347 
348 static void
vtrerror(VtReq * r,char * error)349 vtrerror(VtReq *r, char *error)
350 {
351 	r->rx.msgtype = VtRerror;
352 	r->rx.error = estrdup(error);
353 }
354 
355 static void
ventiserver(void * v)356 ventiserver(void *v)
357 {
358 	Packet *p;
359 	VtReq *r;
360 	char err[ERRMAX];
361 	uint ms;
362 	int cached, ok;
363 
364 	USED(v);
365 	threadsetname("ventiserver");
366 	trace(TraceWork, "start");
367 	while((r = vtgetreq(ventisrv)) != nil){
368 		trace(TraceWork, "finish");
369 		trace(TraceWork, "start request %F", &r->tx);
370 		trace(TraceRpc, "<- %F", &r->tx);
371 		r->rx.msgtype = r->tx.msgtype+1;
372 		addstat(StatRpcTotal, 1);
373 		if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
374 			mainindex->arenas[0], mainindex->sects[0], &r->tx);
375 		switch(r->tx.msgtype){
376 		default:
377 			vtrerror(r, "unknown request");
378 			break;
379 		case VtTread:
380 			ms = msec();
381 			r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
382 			ms = msec() - ms;
383 			addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
384 			if(r->rx.data == nil){
385 				addstat(StatRpcReadFail, 1);
386 				rerrstr(err, sizeof err);
387 				vtrerror(r, err);
388 			}else{
389 				addstat(StatRpcReadBytes, packetsize(r->rx.data));
390 				addstat(StatRpcReadOk, 1);
391 				if(cached)
392 					addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
393 				else
394 					addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
395 			}
396 			break;
397 		case VtTwrite:
398 			if(readonly){
399 				vtrerror(r, "read only");
400 				break;
401 			}
402 			p = r->tx.data;
403 			r->tx.data = nil;
404 			addstat(StatRpcWriteBytes, packetsize(p));
405 			ms = msec();
406 			ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
407 			ms = msec() - ms;
408 			addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
409 
410 			if(ok < 0){
411 				addstat(StatRpcWriteFail, 1);
412 				rerrstr(err, sizeof err);
413 				vtrerror(r, err);
414 			}
415 			break;
416 		case VtTsync:
417 			flushqueue();
418 			flushdcache();
419 			break;
420 		}
421 		trace(TraceRpc, "-> %F", &r->rx);
422 		vtrespond(r);
423 		trace(TraceWork, "start");
424 	}
425 	flushdcache();
426 	flushicache();
427 	threadexitsall(0);
428 }
429