xref: /plan9-contrib/sys/src/cmd/venti/srv/mirrorarenas.c (revision fececb924262ae5acb31c5c448a4a6a523887b15)
1 /*
2  * Mirror one arena partition onto another.
3  * Be careful to copy only new data.
4  */
5 
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
9 
10 Channel *writechan;
11 
12 typedef struct Write Write;
13 struct Write
14 {
15 	uchar *p;
16 	int n;
17 	uvlong o;
18 	int error;
19 };
20 
21 Part *src;
22 Part *dst;
23 int force;
24 int verbose;
25 char *status;
26 uvlong astart, aend;
27 
28 void
29 usage(void)
30 {
31 	fprint(2, "usage: mirrorarenas [-v] src dst [ranges]\n");
32 	threadexitsall("usage");
33 }
34 
35 char *tagged;
36 
37 void
38 tag(char *fmt, ...)
39 {
40 	va_list arg;
41 
42 	if(tagged){
43 		free(tagged);
44 		tagged = nil;
45 	}
46 	va_start(arg, fmt);
47 	tagged = vsmprint(fmt, arg);
48 	va_end(arg);
49 }
50 
51 void
52 chat(char *fmt, ...)
53 {
54 	va_list arg;
55 
56 	if(tagged){
57 		write(1, tagged, strlen(tagged));
58 		free(tagged);
59 		tagged = nil;
60 	}
61 	va_start(arg, fmt);
62 	vfprint(1, fmt, arg);
63 	va_end(arg);
64 }
65 
66 #pragma varargck argpos tag 1
67 #pragma varargck argpos chat 1
68 
69 
70 int
71 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
72 {
73 	if(readpart(p, offset, buf, count) != count){
74 		chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
75 		return -1;
76 	}
77 	return 0;
78 }
79 
80 int
81 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
82 {
83 	if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
84 		chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
85 		return -1;
86 	}
87 	return 0;
88 }
89 
90 /*
91  * Extra proc to do writes to dst, so that we can overlap reading
92  * src with writing dst during copy.  This is an easy factor of two
93  * (almost) in performance.
94  */
95 static void
96 writeproc(void *v)
97 {
98 	Write *w;
99 
100 	USED(v);
101 	while((w = recvp(writechan)) != nil){
102 		if(w->n == 0)
103 			continue;
104 		if(ewritepart(dst, w->o, w->p, w->n) < 0)
105 			w->error = 1;
106 	}
107 }
108 
109 int
110 copy(uvlong start, uvlong end, char *what, DigestState *ds)
111 {
112 	int i, n;
113 	uvlong o;
114 	static uchar tmp[2][1024*1024];
115 	Write w[2];
116 
117 	assert(start <= end);
118 	assert(astart <= start && start < aend);
119 	assert(astart <= end && end <= aend);
120 
121 	if(verbose && start != end)
122 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
123 
124 	i = 0;
125 	memset(w, 0, sizeof w);
126 	for(o=start; o<end; o+=n){
127 		if(w[i].error)
128 			goto error;
129 		n = sizeof tmp[i];
130 		if(o+n > end)
131 			n = end - o;
132 		if(ereadpart(src, o, tmp[i], n) < 0)
133 			goto error;
134 		w[i].p = tmp[i];
135 		w[i].o = o;
136 		w[i].n = n;
137 		w[i].error = 0;
138 		sendp(writechan, &w[i]);
139 		if(ds)
140 			sha1(tmp[i], n, nil, ds);
141 		i = 1-i;
142 	}
143 	if(w[i].error)
144 		goto error;
145 
146 	/*
147 	 * wait for queued write to finish
148 	 */
149 	w[i].p = nil;
150 	w[i].o = 0;
151 	w[i].n = 0;
152 	w[i].error = 0;
153 	sendp(writechan, &w[i]);
154 	i = 1-i;
155 	if(w[i].error)
156 		return -1;
157 	return 0;
158 
159 error:
160 	/*
161 	 * sync with write proc
162 	 */
163 	w[i].p = nil;
164 	w[i].o = 0;
165 	w[i].n = 0;
166 	w[i].error = 0;
167 	sendp(writechan, &w[i]);
168 	return -1;
169 }
170 
171 /* single-threaded, for reference */
172 int
173 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
174 {
175 	int n;
176 	uvlong o;
177 	static uchar tmp[1024*1024];
178 
179 	assert(start <= end);
180 	assert(astart <= start && start < aend);
181 	assert(astart <= end && end <= aend);
182 
183 	if(verbose && start != end)
184 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
185 
186 	for(o=start; o<end; o+=n){
187 		n = sizeof tmp;
188 		if(o+n > end)
189 			n = end - o;
190 		if(ereadpart(src, o, tmp, n) < 0)
191 			return -1;
192 		if(ds)
193 			sha1(tmp, n, nil, ds);
194 		if(ewritepart(dst, o, tmp, n) < 0)
195 			return -1;
196 	}
197 	return 0;
198 }
199 
200 int
201 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
202 {
203 	int n;
204 	uvlong o;
205 	static uchar tmp[1024*1024];
206 
207 	if(start == end)
208 		return 0;
209 	assert(start < end);
210 
211 	if(verbose)
212 		chat("%T   sha1 %,llud-%,llud\n", start, end);
213 
214 	for(o=start; o<end; o+=n){
215 		n = sizeof tmp;
216 		if(o+n > end)
217 			n = end - o;
218 		if(ereadpart(p, o, tmp, n) < 0)
219 			return -1;
220 		sha1(tmp, n, nil, ds);
221 	}
222 	return 0;
223 }
224 
225 uvlong
226 rdown(uvlong a, int b)
227 {
228 	return a-a%b;
229 }
230 
231 uvlong
232 rup(uvlong a, int b)
233 {
234 	if(a%b == 0)
235 		return a;
236 	return a+b-a%b;
237 }
238 
239 void
240 mirror(Arena *sa, Arena *da)
241 {
242 	vlong v, si, di, end;
243 	int clumpmax, blocksize;
244 	static uchar buf[MaxIoSize];
245 	ArenaHead h;
246 	DigestState xds, *ds;
247 	vlong shaoff, base;
248 
249 	base = sa->base;
250 	blocksize = sa->blocksize;
251 	end = sa->base + sa->size;
252 
253 	astart = base - blocksize;
254 	aend = end + blocksize;
255 
256 	tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
257 
258 	if(force){
259 		copy(astart, aend, "all", nil);
260 		return;
261 	}
262 
263 	if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
264 		if(scorecmp(sa->score, da->score) == 0){
265 			if(verbose)
266 				chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
267 			return;
268 		}
269 		chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
270 		/* Keep executing; will correct seal if possible. */
271 	}
272 	if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
273 		chat("%T %s: dst is sealed, src is not\n", sa->name);
274 		status = "errors";
275 		return;
276 	}
277 	if(sa->diskstats.used < da->diskstats.used){
278 		chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
279 		status = "errors";
280 		return;
281 	}
282 
283 	if(da->clumpmagic != sa->clumpmagic){
284 		/*
285 		 * Write this now to reduce the window in which
286 		 * the head and tail disagree about clumpmagic.
287 		 */
288 		da->clumpmagic = sa->clumpmagic;
289 		memset(buf, 0, sizeof buf);
290 		packarena(da, buf);
291 		if(ewritepart(dst, end, buf, blocksize) < 0)
292 			return;
293 	}
294 
295 	memset(&h, 0, sizeof h);
296 	h.version = da->version;
297 	strcpy(h.name, da->name);
298 	h.blocksize = da->blocksize;
299 	h.size = da->size + 2*da->blocksize;
300 	h.clumpmagic = da->clumpmagic;
301 	memset(buf, 0, sizeof buf);
302 	packarenahead(&h, buf);
303 	if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
304 		return;
305 
306 	shaoff = 0;
307 	ds = nil;
308 	if(sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0){
309 		/* start sha1 state with header */
310 		memset(&xds, 0, sizeof xds);
311 		ds = &xds;
312 		sha1(buf, blocksize, nil, ds);
313 		shaoff = base;
314 	}
315 
316 	if(sa->diskstats.used != da->diskstats.used){
317 		di = base+rdown(da->diskstats.used, blocksize);
318 		si = base+rup(sa->diskstats.used, blocksize);
319 		if(ds && asha1(dst, shaoff, di, ds) < 0)
320 			return;
321 		if(copy(di, si, "data", ds) < 0)
322 			return;
323 		shaoff = si;
324 	}
325 
326 	clumpmax = sa->clumpmax;
327 	di = end - da->diskstats.clumps/clumpmax * blocksize;
328 	si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
329 
330 	if(sa->diskstats.sealed){
331 		/*
332 		 * might be a small hole between the end of the
333 		 * data and the beginning of the directory.
334 		 */
335 		v = base+rup(sa->diskstats.used, blocksize);
336 		if(ds && asha1(dst, shaoff, v, ds) < 0)
337 			return;
338 		if(copy(v, si, "hole", ds) < 0)
339 			return;
340 		shaoff = si;
341 	}
342 
343 	if(da->diskstats.clumps != sa->diskstats.clumps){
344 		if(ds && asha1(dst, shaoff, si, ds) < 0)
345 			return;
346 		if(copy(si, di, "directory", ds) < 0)	/* si < di  because clumpinfo blocks grow down */
347 			return;
348 		shaoff = di;
349 	}
350 
351 	da->ctime = sa->ctime;
352 	da->wtime = sa->wtime;
353 	da->diskstats = sa->diskstats;
354 	da->diskstats.sealed = 0;
355 
356 	/*
357 	 * Repack the arena tail information
358 	 * and save it for next time...
359 	 */
360 	memset(buf, 0, sizeof buf);
361 	packarena(da, buf);
362 	if(ewritepart(dst, end, buf, blocksize) < 0)
363 		return;
364 
365 	if(ds){
366 		/*
367 		 * ... but on the final pass, copy the encoding
368 		 * of the tail information from the source
369 		 * arena itself.  There are multiple possible
370 		 * ways to write the tail info out (the exact
371 		 * details have changed as venti went through
372 		 * revisions), and to keep the SHA1 hash the
373 		 * same, we have to use what the disk uses.
374 		 */
375 		if(asha1(dst, shaoff, end, ds) < 0
376 		|| copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
377 			return;
378 		memset(buf, 0, VtScoreSize);
379 		sha1(buf, VtScoreSize, da->score, ds);
380 		if(scorecmp(sa->score, da->score) == 0){
381 			if(verbose)
382 				chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
383 			if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
384 				return;
385 		}else{
386 			chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
387 			memset(&xds, 0, sizeof xds);
388 			asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
389 			sha1(buf, VtScoreSize, 0, &xds);
390 			chat("%T   reseal: %V\n", da->score);
391 			status = "errors";
392 		}
393 	}else{
394 		chat("%T %s: %,lld used mirrored\n",
395 			sa->name, sa->diskstats.used);
396 	}
397 }
398 
399 void
400 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
401 {
402 	int i, lo, hi;
403 	char *s, *t;
404 	Arena *sa, *da;
405 
406 	if(range == nil){
407 		for(i=0; i<sp->narenas; i++){
408 			sa = sp->arenas[i];
409 			da = dp->arenas[i];
410 			mirror(sa, da);
411 		}
412 		return;
413 	}
414 	if(strcmp(range, "none") == 0)
415 		return;
416 
417 	for(s=range; *s; s=t){
418 		t = strchr(s, ',');
419 		if(t)
420 			*t++ = 0;
421 		else
422 			t = s+strlen(s);
423 		if(*s == '-')
424 			lo = 0;
425 		else
426 			lo = strtol(s, &s, 0);
427 		hi = lo;
428 		if(*s == '-'){
429 			s++;
430 			if(*s == 0)
431 				hi = sp->narenas-1;
432 			else
433 				hi = strtol(s, &s, 0);
434 		}
435 		if(*s != 0){
436 			chat("%T bad arena range: %s\n", s);
437 			continue;
438 		}
439 		for(i=lo; i<=hi; i++){
440 			sa = sp->arenas[i];
441 			da = dp->arenas[i];
442 			mirror(sa, da);
443 		}
444 	}
445 }
446 
447 
448 void
449 threadmain(int argc, char **argv)
450 {
451 	int i;
452 	Arena *sa, *da;
453 	ArenaPart *s, *d;
454 	char *ranges;
455 
456 	ventifmtinstall();
457 
458 	ARGBEGIN{
459 	case 'F':
460 		force = 1;
461 		break;
462 	case 'v':
463 		verbose++;
464 		break;
465 	default:
466 		usage();
467 	}ARGEND
468 
469 	if(argc != 2 && argc != 3)
470 		usage();
471 	ranges = nil;
472 	if(argc == 3)
473 		ranges = argv[2];
474 
475 	if((src = initpart(argv[0], OREAD)) == nil)
476 		sysfatal("initpart %s: %r", argv[0]);
477 	if((dst = initpart(argv[1], ORDWR)) == nil)
478 		sysfatal("initpart %s: %r", argv[1]);
479 	if((s = initarenapart(src)) == nil)
480 		sysfatal("initarenapart %s: %r", argv[0]);
481 	for(i=0; i<s->narenas; i++)
482 		delarena(s->arenas[i]);
483 	if((d = initarenapart(dst)) == nil)
484 		sysfatal("loadarenapart %s: %r", argv[1]);
485 	for(i=0; i<d->narenas; i++)
486 		delarena(d->arenas[i]);
487 
488 	/*
489 	 * The arena geometries must match or all bets are off.
490 	 */
491 	if(s->narenas != d->narenas)
492 		sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
493 	for(i=0; i<s->narenas; i++){
494 		sa = s->arenas[i];
495 		da = d->arenas[i];
496 		if(sa->version != da->version)
497 			sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
498 		if(sa->blocksize != da->blocksize)
499 			sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
500 		if(sa->size != da->size)
501 			sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
502 		if(strcmp(sa->name, da->name) != 0)
503 			sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
504 	}
505 
506 	/*
507 	 * Mirror one arena at a time.
508 	 */
509 	writechan = chancreate(sizeof(void*), 0);
510 	vtproc(writeproc, nil);
511 	mirrormany(s, d, ranges);
512 	sendp(writechan, nil);
513 	threadexitsall(status);
514 }
515