xref: /plan9/sys/src/cmd/venti/srv/mirrorarenas.c (revision f9e1cf08d3be51592e03e639fc848a68dc31a55e)
1 /*
2  * Mirror one arena partition onto another.
3  * Be careful to copy only new data.
4  */
5 
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
9 
10 Channel *writechan;
11 
12 typedef struct Write Write;
13 struct Write
14 {
15 	uchar *p;
16 	int n;
17 	uvlong o;
18 	int error;
19 };
20 
21 Part *src;
22 Part *dst;
23 int force;
24 int verbose;
25 int dosha1 = 1;
26 char *status;
27 uvlong astart, aend;
28 
29 void
usage(void)30 usage(void)
31 {
32 	fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
33 	threadexitsall("usage");
34 }
35 
36 char *tagged;
37 
38 void
tag(char * fmt,...)39 tag(char *fmt, ...)
40 {
41 	va_list arg;
42 
43 	if(tagged){
44 		free(tagged);
45 		tagged = nil;
46 	}
47 	va_start(arg, fmt);
48 	tagged = vsmprint(fmt, arg);
49 	va_end(arg);
50 }
51 
52 void
chat(char * fmt,...)53 chat(char *fmt, ...)
54 {
55 	va_list arg;
56 
57 	if(tagged){
58 		write(1, tagged, strlen(tagged));
59 		free(tagged);
60 		tagged = nil;
61 	}
62 	va_start(arg, fmt);
63 	vfprint(1, fmt, arg);
64 	va_end(arg);
65 }
66 
67 #pragma varargck argpos tag 1
68 #pragma varargck argpos chat 1
69 
70 
71 int
ereadpart(Part * p,u64int offset,u8int * buf,u32int count)72 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
73 {
74 	if(readpart(p, offset, buf, count) != count){
75 		chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
76 		return -1;
77 	}
78 	return 0;
79 }
80 
81 int
ewritepart(Part * p,u64int offset,u8int * buf,u32int count)82 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
83 {
84 	if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
85 		chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
86 		return -1;
87 	}
88 	return 0;
89 }
90 
91 /*
92  * Extra proc to do writes to dst, so that we can overlap reading
93  * src with writing dst during copy.  This is an easy factor of two
94  * (almost) in performance.
95  */
96 static Write wsync;
97 static void
writeproc(void * v)98 writeproc(void *v)
99 {
100 	Write *w;
101 
102 	USED(v);
103 	while((w = recvp(writechan)) != nil){
104 		if(w == &wsync)
105 			continue;
106 		if(ewritepart(dst, w->o, w->p, w->n) < 0)
107 			w->error = 1;
108 	}
109 }
110 
111 int
copy(uvlong start,uvlong end,char * what,DigestState * ds)112 copy(uvlong start, uvlong end, char *what, DigestState *ds)
113 {
114 	int i, n;
115 	uvlong o;
116 	static uchar tmp[2][1024*1024];
117 	Write w[2];
118 
119 	assert(start <= end);
120 	assert(astart <= start && start < aend);
121 	assert(astart <= end && end <= aend);
122 
123 	if(verbose && start != end)
124 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
125 
126 	i = 0;
127 	memset(w, 0, sizeof w);
128 	for(o=start; o<end; o+=n){
129 		if(w[i].error)
130 			goto error;
131 		n = sizeof tmp[i];
132 		if(o+n > end)
133 			n = end - o;
134 		if(ereadpart(src, o, tmp[i], n) < 0)
135 			goto error;
136 		w[i].p = tmp[i];
137 		w[i].o = o;
138 		w[i].n = n;
139 		w[i].error = 0;
140 		sendp(writechan, &w[i]);
141 		if(ds)
142 			sha1(tmp[i], n, nil, ds);
143 		i = 1-i;
144 	}
145 	if(w[i].error)
146 		goto error;
147 
148 	/*
149 	 * wait for queued write to finish
150 	 */
151 	sendp(writechan, &wsync);
152 	i = 1-i;
153 	if(w[i].error)
154 		return -1;
155 	return 0;
156 
157 error:
158 	/*
159 	 * sync with write proc
160 	 */
161 	w[i].p = nil;
162 	w[i].o = 0;
163 	w[i].n = 0;
164 	w[i].error = 0;
165 	sendp(writechan, &w[i]);
166 	return -1;
167 }
168 
169 /* single-threaded, for reference */
170 int
copy1(uvlong start,uvlong end,char * what,DigestState * ds)171 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
172 {
173 	int n;
174 	uvlong o;
175 	static uchar tmp[1024*1024];
176 
177 	assert(start <= end);
178 	assert(astart <= start && start < aend);
179 	assert(astart <= end && end <= aend);
180 
181 	if(verbose && start != end)
182 		chat("%T   copy %,llud-%,llud %s\n", start, end, what);
183 
184 	for(o=start; o<end; o+=n){
185 		n = sizeof tmp;
186 		if(o+n > end)
187 			n = end - o;
188 		if(ereadpart(src, o, tmp, n) < 0)
189 			return -1;
190 		if(ds)
191 			sha1(tmp, n, nil, ds);
192 		if(ewritepart(dst, o, tmp, n) < 0)
193 			return -1;
194 	}
195 	return 0;
196 }
197 
198 int
asha1(Part * p,uvlong start,uvlong end,DigestState * ds)199 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
200 {
201 	int n;
202 	uvlong o;
203 	static uchar tmp[1024*1024];
204 
205 	if(start == end)
206 		return 0;
207 	assert(start < end);
208 
209 	if(verbose)
210 		chat("%T   sha1 %,llud-%,llud\n", start, end);
211 
212 	for(o=start; o<end; o+=n){
213 		n = sizeof tmp;
214 		if(o+n > end)
215 			n = end - o;
216 		if(ereadpart(p, o, tmp, n) < 0)
217 			return -1;
218 		sha1(tmp, n, nil, ds);
219 	}
220 	return 0;
221 }
222 
223 uvlong
rdown(uvlong a,int b)224 rdown(uvlong a, int b)
225 {
226 	return a-a%b;
227 }
228 
229 uvlong
rup(uvlong a,int b)230 rup(uvlong a, int b)
231 {
232 	if(a%b == 0)
233 		return a;
234 	return a+b-a%b;
235 }
236 
237 void
mirror(Arena * sa,Arena * da)238 mirror(Arena *sa, Arena *da)
239 {
240 	vlong v, si, di, end;
241 	int clumpmax, blocksize, sealed;
242 	static uchar buf[MaxIoSize];
243 	ArenaHead h;
244 	DigestState xds, *ds;
245 	vlong shaoff, base;
246 
247 	base = sa->base;
248 	blocksize = sa->blocksize;
249 	end = sa->base + sa->size;
250 
251 	astart = base - blocksize;
252 	aend = end + blocksize;
253 
254 	tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
255 
256 	if(force){
257 		copy(astart, aend, "all", nil);
258 		return;
259 	}
260 
261 	if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
262 		if(scorecmp(sa->score, da->score) == 0){
263 			if(verbose)
264 				chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
265 			return;
266 		}
267 		chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
268 		/* Keep executing; will correct seal if possible. */
269 	}
270 	if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
271 		chat("%T %s: dst is sealed, src is not\n", sa->name);
272 		status = "errors";
273 		return;
274 	}
275 	if(sa->diskstats.used < da->diskstats.used){
276 		chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
277 		status = "errors";
278 		return;
279 	}
280 
281 	if(da->clumpmagic != sa->clumpmagic){
282 		/*
283 		 * Write this now to reduce the window in which
284 		 * the head and tail disagree about clumpmagic.
285 		 */
286 		da->clumpmagic = sa->clumpmagic;
287 		memset(buf, 0, sizeof buf);
288 		packarena(da, buf);
289 		if(ewritepart(dst, end, buf, blocksize) < 0)
290 			return;
291 	}
292 
293 	memset(&h, 0, sizeof h);
294 	h.version = da->version;
295 	strcpy(h.name, da->name);
296 	h.blocksize = da->blocksize;
297 	h.size = da->size + 2*da->blocksize;
298 	h.clumpmagic = da->clumpmagic;
299 	memset(buf, 0, sizeof buf);
300 	packarenahead(&h, buf);
301 	if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
302 		return;
303 
304 	shaoff = 0;
305 	ds = nil;
306 	sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
307 	if(sealed && dosha1){
308 		/* start sha1 state with header */
309 		memset(&xds, 0, sizeof xds);
310 		ds = &xds;
311 		sha1(buf, blocksize, nil, ds);
312 		shaoff = base;
313 	}
314 
315 	if(sa->diskstats.used != da->diskstats.used){
316 		di = base+rdown(da->diskstats.used, blocksize);
317 		si = base+rup(sa->diskstats.used, blocksize);
318 		if(ds && asha1(dst, shaoff, di, ds) < 0)
319 			return;
320 		if(copy(di, si, "data", ds) < 0)
321 			return;
322 		shaoff = si;
323 	}
324 
325 	clumpmax = sa->clumpmax;
326 	di = end - da->diskstats.clumps/clumpmax * blocksize;
327 	si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
328 
329 	if(sa->diskstats.sealed){
330 		/*
331 		 * might be a small hole between the end of the
332 		 * data and the beginning of the directory.
333 		 */
334 		v = base+rup(sa->diskstats.used, blocksize);
335 		if(ds && asha1(dst, shaoff, v, ds) < 0)
336 			return;
337 		if(copy(v, si, "hole", ds) < 0)
338 			return;
339 		shaoff = si;
340 	}
341 
342 	if(da->diskstats.clumps != sa->diskstats.clumps){
343 		if(ds && asha1(dst, shaoff, si, ds) < 0)
344 			return;
345 		if(copy(si, di, "directory", ds) < 0)	/* si < di  because clumpinfo blocks grow down */
346 			return;
347 		shaoff = di;
348 	}
349 
350 	da->ctime = sa->ctime;
351 	da->wtime = sa->wtime;
352 	da->diskstats = sa->diskstats;
353 	da->diskstats.sealed = 0;
354 
355 	/*
356 	 * Repack the arena tail information
357 	 * and save it for next time...
358 	 */
359 	memset(buf, 0, sizeof buf);
360 	packarena(da, buf);
361 	if(ewritepart(dst, end, buf, blocksize) < 0)
362 		return;
363 
364 	if(sealed){
365 		/*
366 		 * ... but on the final pass, copy the encoding
367 		 * of the tail information from the source
368 		 * arena itself.  There are multiple possible
369 		 * ways to write the tail info out (the exact
370 		 * details have changed as venti went through
371 		 * revisions), and to keep the SHA1 hash the
372 		 * same, we have to use what the disk uses.
373 		 */
374 		if(asha1(dst, shaoff, end, ds) < 0
375 		|| copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
376 			return;
377 		if(dosha1){
378 			memset(buf, 0, VtScoreSize);
379 			sha1(buf, VtScoreSize, da->score, ds);
380 			if(scorecmp(sa->score, da->score) == 0){
381 				if(verbose)
382 					chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
383 				if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
384 					return;
385 			}else{
386 				chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
387 				memset(&xds, 0, sizeof xds);
388 				asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
389 				sha1(buf, VtScoreSize, 0, &xds);
390 				chat("%T   reseal: %V\n", da->score);
391 				status = "errors";
392 			}
393 		}else{
394 			if(verbose)
395 				chat("%T %s: %V mirrored\n", sa->name, sa->score);
396 			if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
397 				return;
398 		}
399 	}else{
400 		chat("%T %s: %,lld used mirrored\n",
401 			sa->name, sa->diskstats.used);
402 	}
403 }
404 
405 void
mirrormany(ArenaPart * sp,ArenaPart * dp,char * range)406 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
407 {
408 	int i, lo, hi;
409 	char *s, *t;
410 	Arena *sa, *da;
411 
412 	if(range == nil){
413 		for(i=0; i<sp->narenas; i++){
414 			sa = sp->arenas[i];
415 			da = dp->arenas[i];
416 			mirror(sa, da);
417 		}
418 		return;
419 	}
420 	if(strcmp(range, "none") == 0)
421 		return;
422 
423 	for(s=range; *s; s=t){
424 		t = strchr(s, ',');
425 		if(t)
426 			*t++ = 0;
427 		else
428 			t = s+strlen(s);
429 		if(*s == '-')
430 			lo = 0;
431 		else
432 			lo = strtol(s, &s, 0);
433 		hi = lo;
434 		if(*s == '-'){
435 			s++;
436 			if(*s == 0)
437 				hi = sp->narenas-1;
438 			else
439 				hi = strtol(s, &s, 0);
440 		}
441 		if(*s != 0){
442 			chat("%T bad arena range: %s\n", s);
443 			continue;
444 		}
445 		for(i=lo; i<=hi; i++){
446 			sa = sp->arenas[i];
447 			da = dp->arenas[i];
448 			mirror(sa, da);
449 		}
450 	}
451 }
452 
453 
454 void
threadmain(int argc,char ** argv)455 threadmain(int argc, char **argv)
456 {
457 	int i;
458 	Arena *sa, *da;
459 	ArenaPart *s, *d;
460 	char *ranges;
461 
462 	ventifmtinstall();
463 
464 	ARGBEGIN{
465 	case 'F':
466 		force = 1;
467 		break;
468 	case 'v':
469 		verbose++;
470 		break;
471 	case 's':
472 		dosha1 = 0;
473 		break;
474 	default:
475 		usage();
476 	}ARGEND
477 
478 	if(argc != 2 && argc != 3)
479 		usage();
480 	ranges = nil;
481 	if(argc == 3)
482 		ranges = argv[2];
483 
484 	if((src = initpart(argv[0], OREAD)) == nil)
485 		sysfatal("initpart %s: %r", argv[0]);
486 	if((dst = initpart(argv[1], ORDWR)) == nil)
487 		sysfatal("initpart %s: %r", argv[1]);
488 	if((s = initarenapart(src)) == nil)
489 		sysfatal("initarenapart %s: %r", argv[0]);
490 	for(i=0; i<s->narenas; i++)
491 		delarena(s->arenas[i]);
492 	if((d = initarenapart(dst)) == nil)
493 		sysfatal("loadarenapart %s: %r", argv[1]);
494 	for(i=0; i<d->narenas; i++)
495 		delarena(d->arenas[i]);
496 
497 	/*
498 	 * The arena geometries must match or all bets are off.
499 	 */
500 	if(s->narenas != d->narenas)
501 		sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
502 	for(i=0; i<s->narenas; i++){
503 		sa = s->arenas[i];
504 		da = d->arenas[i];
505 		if(sa->version != da->version)
506 			sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
507 		if(sa->blocksize != da->blocksize)
508 			sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
509 		if(sa->size != da->size)
510 			sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
511 		if(strcmp(sa->name, da->name) != 0)
512 			sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
513 	}
514 
515 	/*
516 	 * Mirror one arena at a time.
517 	 */
518 	writechan = chancreate(sizeof(void*), 0);
519 	vtproc(writeproc, nil);
520 	mirrormany(s, d, ranges);
521 	sendp(writechan, nil);
522 	threadexitsall(status);
523 }
524