1 /* 2 * Mirror one arena partition onto another. 3 * Be careful to copy only new data. 4 */ 5 6 #include "stdinc.h" 7 #include "dat.h" 8 #include "fns.h" 9 10 Channel *writechan; 11 12 typedef struct Write Write; 13 struct Write 14 { 15 uchar *p; 16 int n; 17 uvlong o; 18 int error; 19 }; 20 21 Part *src; 22 Part *dst; 23 int force; 24 int verbose; 25 char *status; 26 uvlong astart, aend; 27 28 void 29 usage(void) 30 { 31 fprint(2, "usage: mirrorarenas [-v] src dst [ranges]\n"); 32 threadexitsall("usage"); 33 } 34 35 char *tagged; 36 37 void 38 tag(char *fmt, ...) 39 { 40 va_list arg; 41 42 if(tagged){ 43 free(tagged); 44 tagged = nil; 45 } 46 va_start(arg, fmt); 47 tagged = vsmprint(fmt, arg); 48 va_end(arg); 49 } 50 51 void 52 chat(char *fmt, ...) 53 { 54 va_list arg; 55 56 if(tagged){ 57 write(1, tagged, strlen(tagged)); 58 free(tagged); 59 tagged = nil; 60 } 61 va_start(arg, fmt); 62 vfprint(1, fmt, arg); 63 va_end(arg); 64 } 65 66 #pragma varargck argpos tag 1 67 #pragma varargck argpos chat 1 68 69 70 int 71 ereadpart(Part *p, u64int offset, u8int *buf, u32int count) 72 { 73 if(readpart(p, offset, buf, count) != count){ 74 chat("%T readpart %s at %#llux+%ud: %r\n", p->name, offset, count); 75 return -1; 76 } 77 return 0; 78 } 79 80 int 81 ewritepart(Part *p, u64int offset, u8int *buf, u32int count) 82 { 83 if(writepart(p, offset, buf, count) != count || flushpart(p) < 0){ 84 chat("%T writepart %s at %#llux+%ud: %r\n", p->name, offset, count); 85 return -1; 86 } 87 return 0; 88 } 89 90 /* 91 * Extra proc to do writes to dst, so that we can overlap reading 92 * src with writing dst during copy. This is an easy factor of two 93 * (almost) in performance. 94 */ 95 static void 96 writeproc(void *v) 97 { 98 Write *w; 99 100 USED(v); 101 while((w = recvp(writechan)) != nil){ 102 if(w->n == 0) 103 continue; 104 if(ewritepart(dst, w->o, w->p, w->n) < 0) 105 w->error = 1; 106 } 107 } 108 109 int 110 copy(uvlong start, uvlong end, char *what, DigestState *ds) 111 { 112 int i, n; 113 uvlong o; 114 static uchar tmp[2][1024*1024]; 115 Write w[2]; 116 117 assert(start <= end); 118 assert(astart <= start && start < aend); 119 assert(astart <= end && end <= aend); 120 121 if(verbose && start != end) 122 chat("%T copy %,llud-%,llud %s\n", start, end, what); 123 124 i = 0; 125 memset(w, 0, sizeof w); 126 for(o=start; o<end; o+=n){ 127 if(w[i].error) 128 goto error; 129 n = sizeof tmp[i]; 130 if(o+n > end) 131 n = end - o; 132 if(ereadpart(src, o, tmp[i], n) < 0) 133 goto error; 134 w[i].p = tmp[i]; 135 w[i].o = o; 136 w[i].n = n; 137 w[i].error = 0; 138 sendp(writechan, &w[i]); 139 if(ds) 140 sha1(tmp[i], n, nil, ds); 141 i = 1-i; 142 } 143 if(w[i].error) 144 goto error; 145 146 /* 147 * wait for queued write to finish 148 */ 149 w[i].p = nil; 150 w[i].o = 0; 151 w[i].n = 0; 152 w[i].error = 0; 153 sendp(writechan, &w[i]); 154 i = 1-i; 155 if(w[i].error) 156 return -1; 157 return 0; 158 159 error: 160 /* 161 * sync with write proc 162 */ 163 w[i].p = nil; 164 w[i].o = 0; 165 w[i].n = 0; 166 w[i].error = 0; 167 sendp(writechan, &w[i]); 168 return -1; 169 } 170 171 /* single-threaded, for reference */ 172 int 173 copy1(uvlong start, uvlong end, char *what, DigestState *ds) 174 { 175 int n; 176 uvlong o; 177 static uchar tmp[1024*1024]; 178 179 assert(start <= end); 180 assert(astart <= start && start < aend); 181 assert(astart <= end && end <= aend); 182 183 if(verbose && start != end) 184 chat("%T copy %,llud-%,llud %s\n", start, end, what); 185 186 for(o=start; o<end; o+=n){ 187 n = sizeof tmp; 188 if(o+n > end) 189 n = end - o; 190 if(ereadpart(src, o, tmp, n) < 0) 191 return -1; 192 if(ds) 193 sha1(tmp, n, nil, ds); 194 if(ewritepart(dst, o, tmp, n) < 0) 195 return -1; 196 } 197 return 0; 198 } 199 200 int 201 asha1(Part *p, uvlong start, uvlong end, DigestState *ds) 202 { 203 int n; 204 uvlong o; 205 static uchar tmp[1024*1024]; 206 207 if(start == end) 208 return 0; 209 assert(start < end); 210 211 if(verbose) 212 chat("%T sha1 %,llud-%,llud\n", start, end); 213 214 for(o=start; o<end; o+=n){ 215 n = sizeof tmp; 216 if(o+n > end) 217 n = end - o; 218 if(ereadpart(p, o, tmp, n) < 0) 219 return -1; 220 sha1(tmp, n, nil, ds); 221 } 222 return 0; 223 } 224 225 uvlong 226 rdown(uvlong a, int b) 227 { 228 return a-a%b; 229 } 230 231 uvlong 232 rup(uvlong a, int b) 233 { 234 if(a%b == 0) 235 return a; 236 return a+b-a%b; 237 } 238 239 void 240 mirror(Arena *sa, Arena *da) 241 { 242 vlong v, si, di, end; 243 int clumpmax, blocksize; 244 static uchar buf[MaxIoSize]; 245 ArenaHead h; 246 DigestState xds, *ds; 247 vlong shaoff, base; 248 249 base = sa->base; 250 blocksize = sa->blocksize; 251 end = sa->base + sa->size; 252 253 astart = base - blocksize; 254 aend = end + blocksize; 255 256 tag("%T %s (%,llud-%,llud)\n", sa->name, astart, aend); 257 258 if(force){ 259 copy(astart, aend, "all", nil); 260 return; 261 } 262 263 if(sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 264 if(scorecmp(sa->score, da->score) == 0){ 265 if(verbose) 266 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 267 return; 268 } 269 chat("%T %s: warning: sealed score mismatch %V vs %V\n", sa->name, sa->score, da->score); 270 /* Keep executing; will correct seal if possible. */ 271 } 272 if(!sa->diskstats.sealed && da->diskstats.sealed && scorecmp(da->score, zeroscore) != 0){ 273 chat("%T %s: dst is sealed, src is not\n", sa->name); 274 status = "errors"; 275 return; 276 } 277 if(sa->diskstats.used < da->diskstats.used){ 278 chat("%T %s: src used %,lld < dst used %,lld\n", sa->name, sa->diskstats.used, da->diskstats.used); 279 status = "errors"; 280 return; 281 } 282 283 if(da->clumpmagic != sa->clumpmagic){ 284 /* 285 * Write this now to reduce the window in which 286 * the head and tail disagree about clumpmagic. 287 */ 288 da->clumpmagic = sa->clumpmagic; 289 memset(buf, 0, sizeof buf); 290 packarena(da, buf); 291 if(ewritepart(dst, end, buf, blocksize) < 0) 292 return; 293 } 294 295 memset(&h, 0, sizeof h); 296 h.version = da->version; 297 strcpy(h.name, da->name); 298 h.blocksize = da->blocksize; 299 h.size = da->size + 2*da->blocksize; 300 h.clumpmagic = da->clumpmagic; 301 memset(buf, 0, sizeof buf); 302 packarenahead(&h, buf); 303 if(ewritepart(dst, base - blocksize, buf, blocksize) < 0) 304 return; 305 306 shaoff = 0; 307 ds = nil; 308 if(sa->diskstats.sealed && scorecmp(sa->score, zeroscore) != 0){ 309 /* start sha1 state with header */ 310 memset(&xds, 0, sizeof xds); 311 ds = &xds; 312 sha1(buf, blocksize, nil, ds); 313 shaoff = base; 314 } 315 316 if(sa->diskstats.used != da->diskstats.used){ 317 di = base+rdown(da->diskstats.used, blocksize); 318 si = base+rup(sa->diskstats.used, blocksize); 319 if(ds && asha1(dst, shaoff, di, ds) < 0) 320 return; 321 if(copy(di, si, "data", ds) < 0) 322 return; 323 shaoff = si; 324 } 325 326 clumpmax = sa->clumpmax; 327 di = end - da->diskstats.clumps/clumpmax * blocksize; 328 si = end - (sa->diskstats.clumps+clumpmax-1)/clumpmax * blocksize; 329 330 if(sa->diskstats.sealed){ 331 /* 332 * might be a small hole between the end of the 333 * data and the beginning of the directory. 334 */ 335 v = base+rup(sa->diskstats.used, blocksize); 336 if(ds && asha1(dst, shaoff, v, ds) < 0) 337 return; 338 if(copy(v, si, "hole", ds) < 0) 339 return; 340 shaoff = si; 341 } 342 343 if(da->diskstats.clumps != sa->diskstats.clumps){ 344 if(ds && asha1(dst, shaoff, si, ds) < 0) 345 return; 346 if(copy(si, di, "directory", ds) < 0) /* si < di because clumpinfo blocks grow down */ 347 return; 348 shaoff = di; 349 } 350 351 da->ctime = sa->ctime; 352 da->wtime = sa->wtime; 353 da->diskstats = sa->diskstats; 354 da->diskstats.sealed = 0; 355 356 /* 357 * Repack the arena tail information 358 * and save it for next time... 359 */ 360 memset(buf, 0, sizeof buf); 361 packarena(da, buf); 362 if(ewritepart(dst, end, buf, blocksize) < 0) 363 return; 364 365 if(ds){ 366 /* 367 * ... but on the final pass, copy the encoding 368 * of the tail information from the source 369 * arena itself. There are multiple possible 370 * ways to write the tail info out (the exact 371 * details have changed as venti went through 372 * revisions), and to keep the SHA1 hash the 373 * same, we have to use what the disk uses. 374 */ 375 if(asha1(dst, shaoff, end, ds) < 0 376 || copy(end, end+blocksize-VtScoreSize, "tail", ds) < 0) 377 return; 378 memset(buf, 0, VtScoreSize); 379 sha1(buf, VtScoreSize, da->score, ds); 380 if(scorecmp(sa->score, da->score) == 0){ 381 if(verbose) 382 chat("%T %s: %V sealed mirrored\n", sa->name, sa->score); 383 if(ewritepart(dst, end+blocksize-VtScoreSize, da->score, VtScoreSize) < 0) 384 return; 385 }else{ 386 chat("%T %s: sealing dst: score mismatch: %V vs %V\n", sa->name, sa->score, da->score); 387 memset(&xds, 0, sizeof xds); 388 asha1(dst, base-blocksize, end+blocksize-VtScoreSize, &xds); 389 sha1(buf, VtScoreSize, 0, &xds); 390 chat("%T reseal: %V\n", da->score); 391 status = "errors"; 392 } 393 }else{ 394 chat("%T %s: %,lld used mirrored\n", 395 sa->name, sa->diskstats.used); 396 } 397 } 398 399 void 400 mirrormany(ArenaPart *sp, ArenaPart *dp, char *range) 401 { 402 int i, lo, hi; 403 char *s, *t; 404 Arena *sa, *da; 405 406 if(range == nil){ 407 for(i=0; i<sp->narenas; i++){ 408 sa = sp->arenas[i]; 409 da = dp->arenas[i]; 410 mirror(sa, da); 411 } 412 return; 413 } 414 if(strcmp(range, "none") == 0) 415 return; 416 417 for(s=range; *s; s=t){ 418 t = strchr(s, ','); 419 if(t) 420 *t++ = 0; 421 else 422 t = s+strlen(s); 423 if(*s == '-') 424 lo = 0; 425 else 426 lo = strtol(s, &s, 0); 427 hi = lo; 428 if(*s == '-'){ 429 s++; 430 if(*s == 0) 431 hi = sp->narenas-1; 432 else 433 hi = strtol(s, &s, 0); 434 } 435 if(*s != 0){ 436 chat("%T bad arena range: %s\n", s); 437 continue; 438 } 439 for(i=lo; i<=hi; i++){ 440 sa = sp->arenas[i]; 441 da = dp->arenas[i]; 442 mirror(sa, da); 443 } 444 } 445 } 446 447 448 void 449 threadmain(int argc, char **argv) 450 { 451 int i; 452 Arena *sa, *da; 453 ArenaPart *s, *d; 454 char *ranges; 455 456 ventifmtinstall(); 457 458 ARGBEGIN{ 459 case 'F': 460 force = 1; 461 break; 462 case 'v': 463 verbose++; 464 break; 465 default: 466 usage(); 467 }ARGEND 468 469 if(argc != 2 && argc != 3) 470 usage(); 471 ranges = nil; 472 if(argc == 3) 473 ranges = argv[2]; 474 475 if((src = initpart(argv[0], OREAD)) == nil) 476 sysfatal("initpart %s: %r", argv[0]); 477 if((dst = initpart(argv[1], ORDWR)) == nil) 478 sysfatal("initpart %s: %r", argv[1]); 479 if((s = initarenapart(src)) == nil) 480 sysfatal("initarenapart %s: %r", argv[0]); 481 for(i=0; i<s->narenas; i++) 482 delarena(s->arenas[i]); 483 if((d = initarenapart(dst)) == nil) 484 sysfatal("loadarenapart %s: %r", argv[1]); 485 for(i=0; i<d->narenas; i++) 486 delarena(d->arenas[i]); 487 488 /* 489 * The arena geometries must match or all bets are off. 490 */ 491 if(s->narenas != d->narenas) 492 sysfatal("arena count mismatch: %d vs %d", s->narenas, d->narenas); 493 for(i=0; i<s->narenas; i++){ 494 sa = s->arenas[i]; 495 da = d->arenas[i]; 496 if(sa->version != da->version) 497 sysfatal("arena %d: version mismatch: %d vs %d", i, sa->version, da->version); 498 if(sa->blocksize != da->blocksize) 499 sysfatal("arena %d: blocksize mismatch: %d vs %d", i, sa->blocksize, da->blocksize); 500 if(sa->size != da->size) 501 sysfatal("arena %d: size mismatch: %,lld vs %,lld", i, sa->size, da->size); 502 if(strcmp(sa->name, da->name) != 0) 503 sysfatal("arena %d: name mismatch: %s vs %s", i, sa->name, da->name); 504 } 505 506 /* 507 * Mirror one arena at a time. 508 */ 509 writechan = chancreate(sizeof(void*), 0); 510 vtproc(writeproc, nil); 511 mirrormany(s, d, ranges); 512 sendp(writechan, nil); 513 threadexitsall(status); 514 } 515