1 /*
2 * Mirror one arena partition onto another.
3 * Be careful to copy only new data.
4 */
5
6 #include "stdinc.h"
7 #include "dat.h"
8 #include "fns.h"
9
10 Channel *writechan;
11
12 typedef struct Write Write;
13 struct Write
14 {
15 uchar *p;
16 int n;
17 uvlong o;
18 int error;
19 };
20
21 Part *src;
22 Part *dst;
23 int force;
24 int verbose;
25 int dosha1 = 1;
26 char *status;
27 uvlong astart, aend;
28
29 void
usage(void)30 usage(void)
31 {
32 fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n");
33 threadexitsall("usage");
34 }
35
36 char *tagged;
37
38 void
tag(char * fmt,...)39 tag(char *fmt, ...)
40 {
41 va_list arg;
42
43 if(tagged){
44 free(tagged);
45 tagged = nil;
46 }
47 va_start(arg, fmt);
48 tagged = vsmprint(fmt, arg);
49 va_end(arg);
50 }
51
52 void
chat(char * fmt,...)53 chat(char *fmt, ...)
54 {
55 va_list arg;
56
57 if(tagged){
58 write(1, tagged, strlen(tagged));
59 free(tagged);
60 tagged = nil;
61 }
62 va_start(arg, fmt);
63 vfprint(1, fmt, arg);
64 va_end(arg);
65 }
66
67 #pragma varargck argpos tag 1
68 #pragma varargck argpos chat 1
69
70
71 int
ereadpart(Part * p,u64int offset,u8int * buf,u32int count)72 ereadpart(Part *p, u64int offset, u8int *buf, u32int count)
73 {
74 if(readpart(p, offset, buf, count) != count){
75 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count);
76 return -1;
77 }
78 return 0;
79 }
80
81 int
ewritepart(Part * p,u64int offset,u8int * buf,u32int count)82 ewritepart(Part *p, u64int offset, u8int *buf, u32int count)
83 {
84 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){
85 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count);
86 return -1;
87 }
88 return 0;
89 }
90
91 /*
92 * Extra proc to do writes to dst, so that we can overlap reading
93 * src with writing dst during copy. This is an easy factor of two
94 * (almost) in performance.
95 */
96 static Write wsync;
97 static void
writeproc(void * v)98 writeproc(void *v)
99 {
100 Write *w;
101
102 USED(v);
103 while((w = recvp(writechan)) != nil){
104 if(w == &wsync)
105 continue;
106 if(ewritepart(dst, w->o, w->p, w->n) < 0)
107 w->error = 1;
108 }
109 }
110
111 int
copy(uvlong start,uvlong end,char * what,DigestState * ds)112 copy(uvlong start, uvlong end, char *what, DigestState *ds)
113 {
114 int i, n;
115 uvlong o;
116 static uchar tmp[2][1024*1024];
117 Write w[2];
118
119 assert(start <= end);
120 assert(astart <= start && start < aend);
121 assert(astart <= end && end <= aend);
122
123 if(verbose && start != end)
124 chat("%T copy %,llud-%,llud %s\n", start, end, what);
125
126 i = 0;
127 memset(w, 0, sizeof w);
128 for(o=start; o<end; o+=n){
129 if(w[i].error)
130 goto error;
131 n = sizeof tmp[i];
132 if(o+n > end)
133 n = end - o;
134 if(ereadpart(src, o, tmp[i], n) < 0)
135 goto error;
136 w[i].p = tmp[i];
137 w[i].o = o;
138 w[i].n = n;
139 w[i].error = 0;
140 sendp(writechan, &w[i]);
141 if(ds)
142 sha1(tmp[i], n, nil, ds);
143 i = 1-i;
144 }
145 if(w[i].error)
146 goto error;
147
148 /*
149 * wait for queued write to finish
150 */
151 sendp(writechan, &wsync);
152 i = 1-i;
153 if(w[i].error)
154 return -1;
155 return 0;
156
157 error:
158 /*
159 * sync with write proc
160 */
161 w[i].p = nil;
162 w[i].o = 0;
163 w[i].n = 0;
164 w[i].error = 0;
165 sendp(writechan, &w[i]);
166 return -1;
167 }
168
169 /* single-threaded, for reference */
170 int
copy1(uvlong start,uvlong end,char * what,DigestState * ds)171 copy1(uvlong start, uvlong end, char *what, DigestState *ds)
172 {
173 int n;
174 uvlong o;
175 static uchar tmp[1024*1024];
176
177 assert(start <= end);
178 assert(astart <= start && start < aend);
179 assert(astart <= end && end <= aend);
180
181 if(verbose && start != end)
182 chat("%T copy %,llud-%,llud %s\n", start, end, what);
183
184 for(o=start; o<end; o+=n){
185 n = sizeof tmp;
186 if(o+n > end)
187 n = end - o;
188 if(ereadpart(src, o, tmp, n) < 0)
189 return -1;
190 if(ds)
191 sha1(tmp, n, nil, ds);
192 if(ewritepart(dst, o, tmp, n) < 0)
193 return -1;
194 }
195 return 0;
196 }
197
198 int
asha1(Part * p,uvlong start,uvlong end,DigestState * ds)199 asha1(Part *p, uvlong start, uvlong end, DigestState *ds)
200 {
201 int n;
202 uvlong o;
203 static uchar tmp[1024*1024];
204
205 if(start == end)
206 return 0;
207 assert(start < end);
208
209 if(verbose)
210 chat("%T sha1 %,llud-%,llud\n", start, end);
211
212 for(o=start; o<end; o+=n){
213 n = sizeof tmp;
214 if(o+n > end)
215 n = end - o;
216 if(ereadpart(p, o, tmp, n) < 0)
217 return -1;
218 sha1(tmp, n, nil, ds);
219 }
220 return 0;
221 }
222
223 uvlong
rdown(uvlong a,int b)224 rdown(uvlong a, int b)
225 {
226 return a-a%b;
227 }
228
229 uvlong
rup(uvlong a,int b)230 rup(uvlong a, int b)
231 {
232 if(a%b == 0)
233 return a;
234 return a+b-a%b;
235 }
236
237 void
mirror(Arena * sa,Arena * da)238 mirror(Arena *sa, Arena *da)
239 {
240 vlong v, si, di, end;
241 int clumpmax, blocksize, sealed;
242 static uchar buf[MaxIoSize];
243 ArenaHead h;
244 DigestState xds, *ds;
245 vlong shaoff, base;
246
247 base = sa->base;
248 blocksize = sa->blocksize;
249 end = sa->base + sa->size;
250
251 astart = base - blocksize;
252 aend = end + blocksize;
253
254 tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend);
255
256 if(force){
257 copy(astart, aend, "all", nil);
258 return;
259 }
260
261 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
262 if(scorecmp(sa->score, da->score) == 0){
263 if(verbose)
264 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
265 return;
266 }
267 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score);
268 /* Keep executing; will correct seal if possible. */
269 }
270 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){
271 chat("%T %s: dst is sealed, src is not\n", sa->name);
272 status = "errors";
273 return;
274 }
275 if(sa->diskstats.used < da->diskstats.used){
276 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used);
277 status = "errors";
278 return;
279 }
280
281 if(da->clumpmagic != sa->clumpmagic){
282 /*
283 * Write this now to reduce the window in which
284 * the head and tail disagree about clumpmagic.
285 */
286 da->clumpmagic = sa->clumpmagic;
287 memset(buf, 0, sizeof buf);
288 packarena(da, buf);
289 if(ewritepart(dst, end, buf, blocksize) < 0)
290 return;
291 }
292
293 memset(&h, 0, sizeof h);
294 h.version = da->version;
295 strcpy(h.name, da->name);
296 h.blocksize = da->blocksize;
297 h.size = da->size + 2*da->blocksize;
298 h.clumpmagic = da->clumpmagic;
299 memset(buf, 0, sizeof buf);
300 packarenahead(&h, buf);
301 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0)
302 return;
303
304 shaoff = 0;
305 ds = nil;
306 sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0;
307 if(sealed && dosha1){
308 /* start sha1 state with header */
309 memset(&xds, 0, sizeof xds);
310 ds = &xds;
311 sha1(buf, blocksize, nil, ds);
312 shaoff = base;
313 }
314
315 if(sa->diskstats.used != da->diskstats.used){
316 di = base+rdown(da->diskstats.used, blocksize);
317 si = base+rup(sa->diskstats.used, blocksize);
318 if(ds && asha1(dst, shaoff, di, ds) < 0)
319 return;
320 if(copy(di, si, "data", ds) < 0)
321 return;
322 shaoff = si;
323 }
324
325 clumpmax = sa->clumpmax;
326 di = end - da->diskstats.clumps/clumpmax * blocksize;
327 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize;
328
329 if(sa->diskstats.sealed){
330 /*
331 * might be a small hole between the end of the
332 * data and the beginning of the directory.
333 */
334 v = base+rup(sa->diskstats.used, blocksize);
335 if(ds && asha1(dst, shaoff, v, ds) < 0)
336 return;
337 if(copy(v, si, "hole", ds) < 0)
338 return;
339 shaoff = si;
340 }
341
342 if(da->diskstats.clumps != sa->diskstats.clumps){
343 if(ds && asha1(dst, shaoff, si, ds) < 0)
344 return;
345 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */
346 return;
347 shaoff = di;
348 }
349
350 da->ctime = sa->ctime;
351 da->wtime = sa->wtime;
352 da->diskstats = sa->diskstats;
353 da->diskstats.sealed = 0;
354
355 /*
356 * Repack the arena tail information
357 * and save it for next time...
358 */
359 memset(buf, 0, sizeof buf);
360 packarena(da, buf);
361 if(ewritepart(dst, end, buf, blocksize) < 0)
362 return;
363
364 if(sealed){
365 /*
366 * ... but on the final pass, copy the encoding
367 * of the tail information from the source
368 * arena itself. There are multiple possible
369 * ways to write the tail info out (the exact
370 * details have changed as venti went through
371 * revisions), and to keep the SHA1 hash the
372 * same, we have to use what the disk uses.
373 */
374 if(asha1(dst, shaoff, end, ds) < 0
375 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0)
376 return;
377 if(dosha1){
378 memset(buf, 0, VtScoreSize);
379 sha1(buf, VtScoreSize, da->score, ds);
380 if(scorecmp(sa->score, da->score) == 0){
381 if(verbose)
382 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score);
383 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0)
384 return;
385 }else{
386 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score);
387 memset(&xds, 0, sizeof xds);
388 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds);
389 sha1(buf, VtScoreSize, 0, &xds);
390 chat("%T reseal: %V\n", da->score);
391 status = "errors";
392 }
393 }else{
394 if(verbose)
395 chat("%T %s: %V mirrored\n", sa->name, sa->score);
396 if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0)
397 return;
398 }
399 }else{
400 chat("%T %s: %,lld used mirrored\n",
401 sa->name, sa->diskstats.used);
402 }
403 }
404
405 void
mirrormany(ArenaPart * sp,ArenaPart * dp,char * range)406 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range)
407 {
408 int i, lo, hi;
409 char *s, *t;
410 Arena *sa, *da;
411
412 if(range == nil){
413 for(i=0; i<sp->narenas; i++){
414 sa = sp->arenas[i];
415 da = dp->arenas[i];
416 mirror(sa, da);
417 }
418 return;
419 }
420 if(strcmp(range, "none") == 0)
421 return;
422
423 for(s=range; *s; s=t){
424 t = strchr(s, ',');
425 if(t)
426 *t++ = 0;
427 else
428 t = s+strlen(s);
429 if(*s == '-')
430 lo = 0;
431 else
432 lo = strtol(s, &s, 0);
433 hi = lo;
434 if(*s == '-'){
435 s++;
436 if(*s == 0)
437 hi = sp->narenas-1;
438 else
439 hi = strtol(s, &s, 0);
440 }
441 if(*s != 0){
442 chat("%T bad arena range: %s\n", s);
443 continue;
444 }
445 for(i=lo; i<=hi; i++){
446 sa = sp->arenas[i];
447 da = dp->arenas[i];
448 mirror(sa, da);
449 }
450 }
451 }
452
453
454 void
threadmain(int argc,char ** argv)455 threadmain(int argc, char **argv)
456 {
457 int i;
458 Arena *sa, *da;
459 ArenaPart *s, *d;
460 char *ranges;
461
462 ventifmtinstall();
463
464 ARGBEGIN{
465 case 'F':
466 force = 1;
467 break;
468 case 'v':
469 verbose++;
470 break;
471 case 's':
472 dosha1 = 0;
473 break;
474 default:
475 usage();
476 }ARGEND
477
478 if(argc != 2 && argc != 3)
479 usage();
480 ranges = nil;
481 if(argc == 3)
482 ranges = argv[2];
483
484 if((src = initpart(argv[0], OREAD)) == nil)
485 sysfatal("initpart %s: %r", argv[0]);
486 if((dst = initpart(argv[1], ORDWR)) == nil)
487 sysfatal("initpart %s: %r", argv[1]);
488 if((s = initarenapart(src)) == nil)
489 sysfatal("initarenapart %s: %r", argv[0]);
490 for(i=0; i<s->narenas; i++)
491 delarena(s->arenas[i]);
492 if((d = initarenapart(dst)) == nil)
493 sysfatal("loadarenapart %s: %r", argv[1]);
494 for(i=0; i<d->narenas; i++)
495 delarena(d->arenas[i]);
496
497 /*
498 * The arena geometries must match or all bets are off.
499 */
500 if(s->narenas != d->narenas)
501 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas);
502 for(i=0; i<s->narenas; i++){
503 sa = s->arenas[i];
504 da = d->arenas[i];
505 if(sa->version != da->version)
506 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version);
507 if(sa->blocksize != da->blocksize)
508 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize);
509 if(sa->size != da->size)
510 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size);
511 if(strcmp(sa->name, da->name) != 0)
512 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name);
513 }
514
515 /*
516 * Mirror one arena at a time.
517 */
518 writechan = chancreate(sizeof(void*), 0);
519 vtproc(writeproc, nil);
520 mirrormany(s, d, ranges);
521 sendp(writechan, nil);
522 threadexitsall(status);
523 }
524