1 /* 2 * Mirror one arena partition onto another. 3 * Be careful to copy only new data. 4 */ 5 6 #include "stdinc.h" 7 #include "dat.h" 8 #include "fns.h" 9 10 Channel *writechan; 11 12 typedef struct Write Write; 13 struct Write 14 { 15 uchar *p; 16 int n; 17 uvlong o; 18 int error; 19 }; 20 21 Part *src; 22 Part *dst; 23 int force; 24 int verbose; 25 int dosha1 = 1; 26 char *status; 27 uvlong astart, aend; 28 29 void 30 usage(void) 31 { 32 fprint(2, "usage: mirrorarenas [-sv] src dst [ranges]\n"); 33 threadexitsall("usage"); 34 } 35 36 char *tagged; 37 38 void 39 tag(char *fmt, ...) 40 { 41 va_list arg; 42 43 if(tagged){ 44 free(tagged); 45 tagged = nil; 46 } 47 va_start(arg, fmt); 48 tagged = vsmprint(fmt, arg); 49 va_end(arg); 50 } 51 52 void 53 chat(char *fmt, ...) 54 { 55 va_list arg; 56 57 if(tagged){ 58 write(1, tagged, strlen(tagged)); 59 free(tagged); 60 tagged = nil; 61 } 62 va_start(arg, fmt); 63 vfprint(1, fmt, arg); 64 va_end(arg); 65 } 66 67 #pragma varargck argpos tag 1 68 #pragma varargck argpos chat 1 69 70 71 int 72 ereadpart(Part *p, u64int offset, u8int *buf, u32int count) 73 { 74 if(readpart(p, offset, buf, count) != count){ 75 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count); 76 return -1; 77 } 78 return 0; 79 } 80 81 int 82 ewritepart(Part *p, u64int offset, u8int *buf, u32int count) 83 { 84 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){ 85 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count); 86 return -1; 87 } 88 return 0; 89 } 90 91 /* 92 * Extra proc to do writes to dst, so that we can overlap reading 93 * src with writing dst during copy. This is an easy factor of two 94 * (almost) in performance. 95 */ 96 static Write wsync; 97 static void 98 writeproc(void *v) 99 { 100 Write *w; 101 102 USED(v); 103 while((w = recvp(writechan)) != nil){ 104 if(w == &wsync) 105 continue; 106 if(ewritepart(dst, w->o, w->p, w->n) < 0) 107 w->error = 1; 108 } 109 } 110 111 int 112 copy(uvlong start, uvlong end, char *what, DigestState *ds) 113 { 114 int i, n; 115 uvlong o; 116 static uchar tmp[2][1024*1024]; 117 Write w[2]; 118 119 assert(start <= end); 120 assert(astart <= start && start < aend); 121 assert(astart <= end && end <= aend); 122 123 if(verbose && start != end) 124 chat("%T copy %,llud-%,llud %s\n", start, end, what); 125 126 i = 0; 127 memset(w, 0, sizeof w); 128 for(o=start; o<end; o+=n){ 129 if(w[i].error) 130 goto error; 131 n = sizeof tmp[i]; 132 if(o+n > end) 133 n = end - o; 134 if(ereadpart(src, o, tmp[i], n) < 0) 135 goto error; 136 w[i].p = tmp[i]; 137 w[i].o = o; 138 w[i].n = n; 139 w[i].error = 0; 140 sendp(writechan, &w[i]); 141 if(ds) 142 sha1(tmp[i], n, nil, ds); 143 i = 1-i; 144 } 145 if(w[i].error) 146 goto error; 147 148 /* 149 * wait for queued write to finish 150 */ 151 sendp(writechan, &wsync); 152 i = 1-i; 153 if(w[i].error) 154 return -1; 155 return 0; 156 157 error: 158 /* 159 * sync with write proc 160 */ 161 w[i].p = nil; 162 w[i].o = 0; 163 w[i].n = 0; 164 w[i].error = 0; 165 sendp(writechan, &w[i]); 166 return -1; 167 } 168 169 /* single-threaded, for reference */ 170 int 171 copy1(uvlong start, uvlong end, char *what, DigestState *ds) 172 { 173 int n; 174 uvlong o; 175 static uchar tmp[1024*1024]; 176 177 assert(start <= end); 178 assert(astart <= start && start < aend); 179 assert(astart <= end && end <= aend); 180 181 if(verbose && start != end) 182 chat("%T copy %,llud-%,llud %s\n", start, end, what); 183 184 for(o=start; o<end; o+=n){ 185 n = sizeof tmp; 186 if(o+n > end) 187 n = end - o; 188 if(ereadpart(src, o, tmp, n) < 0) 189 return -1; 190 if(ds) 191 sha1(tmp, n, nil, ds); 192 if(ewritepart(dst, o, tmp, n) < 0) 193 return -1; 194 } 195 return 0; 196 } 197 198 int 199 asha1(Part *p, uvlong start, uvlong end, DigestState *ds) 200 { 201 int n; 202 uvlong o; 203 static uchar tmp[1024*1024]; 204 205 if(start == end) 206 return 0; 207 assert(start < end); 208 209 if(verbose) 210 chat("%T sha1 %,llud-%,llud\n", start, end); 211 212 for(o=start; o<end; o+=n){ 213 n = sizeof tmp; 214 if(o+n > end) 215 n = end - o; 216 if(ereadpart(p, o, tmp, n) < 0) 217 return -1; 218 sha1(tmp, n, nil, ds); 219 } 220 return 0; 221 } 222 223 uvlong 224 rdown(uvlong a, int b) 225 { 226 return a-a%b; 227 } 228 229 uvlong 230 rup(uvlong a, int b) 231 { 232 if(a%b == 0) 233 return a; 234 return a+b-a%b; 235 } 236 237 void 238 mirror(Arena *sa, Arena *da) 239 { 240 vlong v, si, di, end; 241 int clumpmax, blocksize, sealed; 242 static uchar buf[MaxIoSize]; 243 ArenaHead h; 244 DigestState xds, *ds; 245 vlong shaoff, base; 246 247 base = sa->base; 248 blocksize = sa->blocksize; 249 end = sa->base + sa->size; 250 251 astart = base - blocksize; 252 aend = end + blocksize; 253 254 tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend); 255 256 if(force){ 257 copy(astart, aend, "all", nil); 258 return; 259 } 260 261 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 262 if(scorecmp(sa->score, da->score) == 0){ 263 if(verbose) 264 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 265 return; 266 } 267 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score); 268 /* Keep executing; will correct seal if possible. */ 269 } 270 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 271 chat("%T %s: dst is sealed, src is not\n", sa->name); 272 status = "errors"; 273 return; 274 } 275 if(sa->diskstats.used < da->diskstats.used){ 276 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used); 277 status = "errors"; 278 return; 279 } 280 281 if(da->clumpmagic != sa->clumpmagic){ 282 /* 283 * Write this now to reduce the window in which 284 * the head and tail disagree about clumpmagic. 285 */ 286 da->clumpmagic = sa->clumpmagic; 287 memset(buf, 0, sizeof buf); 288 packarena(da, buf); 289 if(ewritepart(dst, end, buf, blocksize) < 0) 290 return; 291 } 292 293 memset(&h, 0, sizeof h); 294 h.version = da->version; 295 strcpy(h.name, da->name); 296 h.blocksize = da->blocksize; 297 h.size = da->size + 2*da->blocksize; 298 h.clumpmagic = da->clumpmagic; 299 memset(buf, 0, sizeof buf); 300 packarenahead(&h, buf); 301 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0) 302 return; 303 304 shaoff = 0; 305 ds = nil; 306 sealed = sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0; 307 if(sealed && dosha1){ 308 /* start sha1 state with header */ 309 memset(&xds, 0, sizeof xds); 310 ds = &xds; 311 sha1(buf, blocksize, nil, ds); 312 shaoff = base; 313 } 314 315 if(sa->diskstats.used != da->diskstats.used){ 316 di = base+rdown(da->diskstats.used, blocksize); 317 si = base+rup(sa->diskstats.used, blocksize); 318 if(ds && asha1(dst, shaoff, di, ds) < 0) 319 return; 320 if(copy(di, si, "data", ds) < 0) 321 return; 322 shaoff = si; 323 } 324 325 clumpmax = sa->clumpmax; 326 di = end - da->diskstats.clumps/clumpmax * blocksize; 327 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize; 328 329 if(sa->diskstats.sealed){ 330 /* 331 * might be a small hole between the end of the 332 * data and the beginning of the directory. 333 */ 334 v = base+rup(sa->diskstats.used, blocksize); 335 if(ds && asha1(dst, shaoff, v, ds) < 0) 336 return; 337 if(copy(v, si, "hole", ds) < 0) 338 return; 339 shaoff = si; 340 } 341 342 if(da->diskstats.clumps != sa->diskstats.clumps){ 343 if(ds && asha1(dst, shaoff, si, ds) < 0) 344 return; 345 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */ 346 return; 347 shaoff = di; 348 } 349 350 da->ctime = sa->ctime; 351 da->wtime = sa->wtime; 352 da->diskstats = sa->diskstats; 353 da->diskstats.sealed = 0; 354 355 /* 356 * Repack the arena tail information 357 * and save it for next time... 358 */ 359 memset(buf, 0, sizeof buf); 360 packarena(da, buf); 361 if(ewritepart(dst, end, buf, blocksize) < 0) 362 return; 363 364 if(sealed){ 365 /* 366 * ... but on the final pass, copy the encoding 367 * of the tail information from the source 368 * arena itself. There are multiple possible 369 * ways to write the tail info out (the exact 370 * details have changed as venti went through 371 * revisions), and to keep the SHA1 hash the 372 * same, we have to use what the disk uses. 373 */ 374 if(asha1(dst, shaoff, end, ds) < 0 375 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0) 376 return; 377 if(dosha1){ 378 memset(buf, 0, VtScoreSize); 379 sha1(buf, VtScoreSize, da->score, ds); 380 if(scorecmp(sa->score, da->score) == 0){ 381 if(verbose) 382 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 383 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0) 384 return; 385 }else{ 386 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score); 387 memset(&xds, 0, sizeof xds); 388 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds); 389 sha1(buf, VtScoreSize, 0, &xds); 390 chat("%T reseal: %V\n", da->score); 391 status = "errors"; 392 } 393 }else{ 394 if(verbose) 395 chat("%T %s: %V mirrored\n", sa->name, sa->score); 396 if(ewritepart(dst, end+blocksize-VtScoreSize, sa->score, VtScoreSize) < 0) 397 return; 398 } 399 }else{ 400 chat("%T %s: %,lld used mirrored\n", 401 sa->name, sa->diskstats.used); 402 } 403 } 404 405 void 406 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range) 407 { 408 int i, lo, hi; 409 char *s, *t; 410 Arena *sa, *da; 411 412 if(range == nil){ 413 for(i=0; i<sp->narenas; i++){ 414 sa = sp->arenas[i]; 415 da = dp->arenas[i]; 416 mirror(sa, da); 417 } 418 return; 419 } 420 if(strcmp(range, "none") == 0) 421 return; 422 423 for(s=range; *s; s=t){ 424 t = strchr(s, ','); 425 if(t) 426 *t++ = 0; 427 else 428 t = s+strlen(s); 429 if(*s == '-') 430 lo = 0; 431 else 432 lo = strtol(s, &s, 0); 433 hi = lo; 434 if(*s == '-'){ 435 s++; 436 if(*s == 0) 437 hi = sp->narenas-1; 438 else 439 hi = strtol(s, &s, 0); 440 } 441 if(*s != 0){ 442 chat("%T bad arena range: %s\n", s); 443 continue; 444 } 445 for(i=lo; i<=hi; i++){ 446 sa = sp->arenas[i]; 447 da = dp->arenas[i]; 448 mirror(sa, da); 449 } 450 } 451 } 452 453 454 void 455 threadmain(int argc, char **argv) 456 { 457 int i; 458 Arena *sa, *da; 459 ArenaPart *s, *d; 460 char *ranges; 461 462 ventifmtinstall(); 463 464 ARGBEGIN{ 465 case 'F': 466 force = 1; 467 break; 468 case 'v': 469 verbose++; 470 break; 471 case 's': 472 dosha1 = 0; 473 break; 474 default: 475 usage(); 476 }ARGEND 477 478 if(argc != 2 && argc != 3) 479 usage(); 480 ranges = nil; 481 if(argc == 3) 482 ranges = argv[2]; 483 484 if((src = initpart(argv[0], OREAD)) == nil) 485 sysfatal("initpart %s: %r", argv[0]); 486 if((dst = initpart(argv[1], ORDWR)) == nil) 487 sysfatal("initpart %s: %r", argv[1]); 488 if((s = initarenapart(src)) == nil) 489 sysfatal("initarenapart %s: %r", argv[0]); 490 for(i=0; i<s->narenas; i++) 491 delarena(s->arenas[i]); 492 if((d = initarenapart(dst)) == nil) 493 sysfatal("loadarenapart %s: %r", argv[1]); 494 for(i=0; i<d->narenas; i++) 495 delarena(d->arenas[i]); 496 497 /* 498 * The arena geometries must match or all bets are off. 499 */ 500 if(s->narenas != d->narenas) 501 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas); 502 for(i=0; i<s->narenas; i++){ 503 sa = s->arenas[i]; 504 da = d->arenas[i]; 505 if(sa->version != da->version) 506 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version); 507 if(sa->blocksize != da->blocksize) 508 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize); 509 if(sa->size != da->size) 510 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size); 511 if(strcmp(sa->name, da->name) != 0) 512 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name); 513 } 514 515 /* 516 * Mirror one arena at a time. 517 */ 518 writechan = chancreate(sizeof(void*), 0); 519 vtproc(writeproc, nil); 520 mirrormany(s, d, ranges); 521 sendp(writechan, nil); 522 threadexitsall(status); 523 } 524