1 /* 2 * Archiver. In charge of sending blocks to Venti. 3 */ 4 5 #include "stdinc.h" 6 #include "dat.h" 7 #include "fns.h" 8 #include "error.h" 9 10 #include "9.h" /* for consPrint */ 11 12 #define DEBUG 0 13 14 static void archThread(void*); 15 16 struct Arch 17 { 18 int ref; 19 uint blockSize; 20 uint diskSize; 21 Cache *c; 22 Fs *fs; 23 VtSession *z; 24 25 VtLock *lk; 26 VtRendez *starve; 27 VtRendez *die; 28 }; 29 30 Arch * 31 archInit(Cache *c, Disk *disk, Fs *fs, VtSession *z) 32 { 33 Arch *a; 34 35 a = vtMemAllocZ(sizeof(Arch)); 36 37 a->c = c; 38 a->z = z; 39 a->fs = fs; 40 a->blockSize = diskBlockSize(disk); 41 a->lk = vtLockAlloc(); 42 a->starve = vtRendezAlloc(a->lk); 43 44 a->ref = 2; 45 vtThread(archThread, a); 46 47 return a; 48 } 49 50 void 51 archFree(Arch *a) 52 { 53 /* kill slave */ 54 vtLock(a->lk); 55 a->die = vtRendezAlloc(a->lk); 56 vtWakeup(a->starve); 57 while(a->ref > 1) 58 vtSleep(a->die); 59 vtUnlock(a->lk); 60 vtRendezFree(a->starve); 61 vtRendezFree(a->die); 62 vtLockFree(a->lk); 63 vtMemFree(a); 64 } 65 66 static int 67 ventiSend(Arch *a, Block *b, uchar *data) 68 { 69 uint n; 70 uchar score[VtScoreSize]; 71 72 if(DEBUG > 1) 73 fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l); 74 n = vtZeroTruncate(vtType[b->l.type], data, a->blockSize); 75 if(DEBUG > 1) 76 fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n); 77 if(!vtWrite(a->z, score, vtType[b->l.type], data, n)){ 78 fprint(2, "ventiSend: vtWrite block %#ux failed: %R\n", b->addr); 79 return 0; 80 } 81 if(!vtSha1Check(score, data, n)){ 82 uchar score2[VtScoreSize]; 83 vtSha1(score2, data, n); 84 fprint(2, "ventiSend: vtWrite block %#ux failed vtSha1Check %V %V\n", 85 b->addr, score, score2); 86 return 0; 87 } 88 if(!vtSync(a->z)) 89 return 0; 90 return 1; 91 } 92 93 /* 94 * parameters for recursion; there are so many, 95 * and some only change occasionally. this is 96 * easier than spelling things out at each call. 97 */ 98 typedef struct Param Param; 99 struct Param 100 { 101 /* these never change */ 102 uint snapEpoch; /* epoch for snapshot being archived */ 103 uint blockSize; 104 Cache *c; 105 Arch *a; 106 107 /* changes on every call */ 108 uint depth; 109 110 /* statistics */ 111 uint nfixed; 112 uint nsend; 113 uint nvisit; 114 uint nfailsend; 115 uint maxdepth; 116 uint nreclaim; 117 uint nfake; 118 uint nreal; 119 120 /* these occasionally change (must save old values and put back) */ 121 uint dsize; 122 uint psize; 123 124 /* return value; avoids using stack space */ 125 Label l; 126 uchar score[VtScoreSize]; 127 }; 128 129 static void 130 shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize) 131 { 132 vtSha1(score, data, vtZeroTruncate(vtType[b->l.type], data, bsize)); 133 } 134 135 static uint 136 etype(Entry *e) 137 { 138 uint t; 139 140 if(e->flags&VtEntryDir) 141 t = BtDir; 142 else 143 t = BtData; 144 return t+e->depth; 145 } 146 147 static uchar* 148 copyBlock(Block *b, u32int blockSize) 149 { 150 uchar *data; 151 152 data = vtMemAlloc(blockSize); 153 if(data == nil) 154 return nil; 155 memmove(data, b->data, blockSize); 156 return data; 157 } 158 159 /* 160 * Walk over the block tree, archiving it to Venti. 161 * 162 * We don't archive the snapshots. Instead we zero the 163 * entries in a temporary copy of the block and archive that. 164 * 165 * Return value is: 166 * 167 * ArchFailure some error occurred 168 * ArchSuccess block and all children archived 169 * ArchFaked success, but block or children got copied 170 */ 171 enum 172 { 173 ArchFailure, 174 ArchSuccess, 175 ArchFaked, 176 }; 177 static int 178 archWalk(Param *p, u32int addr, uchar type, u32int tag) 179 { 180 int ret, i, x, psize, dsize; 181 uchar *data, score[VtScoreSize]; 182 Block *b; 183 Label l; 184 Entry *e; 185 WalkPtr w; 186 187 p->nvisit++; 188 189 b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0); 190 if(b == nil){ 191 fprint(2, "archive(%ud, %#ux): cannot find block: %R\n", p->snapEpoch, addr); 192 if(strcmp(vtGetError(), ELabelMismatch) == 0){ 193 /* might as well plod on so we write _something_ to Venti */ 194 memmove(p->score, vtZeroScore, VtScoreSize); 195 return ArchFaked; 196 } 197 return ArchFailure; 198 } 199 200 if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n", 201 p->depth*2, "", p->snapEpoch, b->addr, &b->l); 202 p->depth++; 203 if(p->depth > p->maxdepth) 204 p->maxdepth = p->depth; 205 206 data = b->data; 207 if((b->l.state&BsVenti) == 0){ 208 initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize); 209 for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){ 210 if(e){ 211 if(!(e->flags&VtEntryActive)) 212 continue; 213 if((e->snap && !e->archive) 214 || (e->flags&VtEntryNoArchive)){ 215 if(0) fprint(2, "snap; faking %#ux\n", b->addr); 216 if(data == b->data){ 217 data = copyBlock(b, p->blockSize); 218 if(data == nil){ 219 ret = ArchFailure; 220 goto Out; 221 } 222 w.data = data; 223 } 224 memmove(e->score, vtZeroScore, VtScoreSize); 225 e->depth = 0; 226 e->size = 0; 227 e->tag = 0; 228 e->flags &= ~VtEntryLocal; 229 entryPack(e, data, w.n-1); 230 continue; 231 } 232 } 233 addr = globalToLocal(score); 234 if(addr == NilBlock) 235 continue; 236 dsize = p->dsize; 237 psize = p->psize; 238 if(e){ 239 p->dsize= e->dsize; 240 p->psize = e->psize; 241 } 242 vtUnlock(b->lk); 243 x = archWalk(p, addr, type, tag); 244 vtLock(b->lk); 245 if(e){ 246 p->dsize = dsize; 247 p->psize = psize; 248 } 249 while(b->iostate != BioClean && b->iostate != BioDirty) 250 vtSleep(b->ioready); 251 switch(x){ 252 case ArchFailure: 253 fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n", 254 addr, b->addr, i); 255 ret = ArchFailure; 256 goto Out; 257 case ArchFaked: 258 /* 259 * When we're writing the entry for an archive directory 260 * (like /archive/2003/1215) then even if we've faked 261 * any data, record the score unconditionally. 262 * This way, we will always record the Venti score here. 263 * Otherwise, temporary data or corrupted file system 264 * would cause us to keep holding onto the on-disk 265 * copy of the archive. 266 */ 267 if(e==nil || !e->archive) 268 if(data == b->data){ 269 if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score); 270 data = copyBlock(b, p->blockSize); 271 if(data == nil){ 272 ret = ArchFailure; 273 goto Out; 274 } 275 w.data = data; 276 } 277 /* fall through */ 278 if(0) fprint(2, "falling\n"); 279 case ArchSuccess: 280 if(e){ 281 memmove(e->score, p->score, VtScoreSize); 282 e->flags &= ~VtEntryLocal; 283 entryPack(e, data, w.n-1); 284 }else 285 memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize); 286 if(data == b->data){ 287 blockDirty(b); 288 /* 289 * If b is in the active tree, then we need to note that we've 290 * just removed addr from the active tree (replacing it with the 291 * copy we just stored to Venti). If addr is in other snapshots, 292 * this will close addr but not free it, since it has a non-empty 293 * epoch range. 294 * 295 * If b is in the active tree but has been copied (this can happen 296 * if we get killed at just the right moment), then we will 297 * mistakenly leak its kids. 298 * 299 * The children of an archive directory (e.g., /archive/2004/0604) 300 * are not treated as in the active tree. 301 */ 302 if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0)) 303 blockRemoveLink(b, addr, p->l.type, p->l.tag, 0); 304 } 305 break; 306 } 307 } 308 309 if(!ventiSend(p->a, b, data)){ 310 p->nfailsend++; 311 ret = ArchFailure; 312 goto Out; 313 } 314 p->nsend++; 315 if(data != b->data) 316 p->nfake++; 317 if(data == b->data){ /* not faking it, so update state */ 318 p->nreal++; 319 l = b->l; 320 l.state |= BsVenti; 321 if(!blockSetLabel(b, &l, 0)){ 322 ret = ArchFailure; 323 goto Out; 324 } 325 } 326 } 327 328 shaBlock(p->score, b, data, p->blockSize); 329 if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data); 330 ret = data!=b->data ? ArchFaked : ArchSuccess; 331 p->l = b->l; 332 Out: 333 if(data != b->data) 334 vtMemFree(data); 335 p->depth--; 336 blockPut(b); 337 return ret; 338 } 339 340 static void 341 archThread(void *v) 342 { 343 Arch *a = v; 344 Block *b; 345 Param p; 346 Super super; 347 int ret; 348 u32int addr; 349 uchar rbuf[VtRootSize]; 350 VtRoot root; 351 352 vtThreadSetName("arch"); 353 354 for(;;){ 355 /* look for work */ 356 vtLock(a->fs->elk); 357 b = superGet(a->c, &super); 358 if(b == nil){ 359 vtUnlock(a->fs->elk); 360 fprint(2, "archThread: superGet: %R\n"); 361 sleep(60*1000); 362 continue; 363 } 364 addr = super.next; 365 if(addr != NilBlock && super.current == NilBlock){ 366 super.current = addr; 367 super.next = NilBlock; 368 superPack(&super, b->data); 369 blockDirty(b); 370 }else 371 addr = super.current; 372 blockPut(b); 373 vtUnlock(a->fs->elk); 374 375 if(addr == NilBlock){ 376 /* wait for work */ 377 vtLock(a->lk); 378 vtSleep(a->starve); 379 if(a->die != nil) 380 goto Done; 381 vtUnlock(a->lk); 382 continue; 383 } 384 385 sleep(10*1000); /* window of opportunity to provoke races */ 386 387 /* do work */ 388 memset(&p, 0, sizeof p); 389 p.blockSize = a->blockSize; 390 p.dsize = 3*VtEntrySize; /* root has three Entries */ 391 p.c = a->c; 392 p.a = a; 393 394 ret = archWalk(&p, addr, BtDir, RootTag); 395 switch(ret){ 396 default: 397 abort(); 398 case ArchFailure: 399 fprint(2, "archiveBlock %#ux: %R\n", addr); 400 sleep(60*1000); 401 continue; 402 case ArchSuccess: 403 case ArchFaked: 404 break; 405 } 406 407 if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud" 408 " send %ud nfailsend %ud nvisit %ud" 409 " nreclaim %ud nfake %ud nreal %ud\n", 410 addr, p.maxdepth, p.nfixed, 411 p.nsend, p.nfailsend, p.nvisit, 412 p.nreclaim, p.nfake, p.nreal); 413 if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize); 414 415 /* tie up vac root */ 416 memset(&root, 0, sizeof root); 417 root.version = VtRootVersion; 418 strecpy(root.type, root.type+sizeof root.type, "vac"); 419 strecpy(root.name, root.name+sizeof root.name, "fossil"); 420 memmove(root.score, p.score, VtScoreSize); 421 memmove(root.prev, super.last, VtScoreSize); 422 root.blockSize = a->blockSize; 423 vtRootPack(&root, rbuf); 424 if(!vtWrite(a->z, p.score, VtRootType, rbuf, VtRootSize) 425 || !vtSha1Check(p.score, rbuf, VtRootSize)){ 426 fprint(2, "vtWriteBlock %#ux: %R\n", addr); 427 sleep(60*1000); 428 continue; 429 } 430 431 /* record success */ 432 vtLock(a->fs->elk); 433 b = superGet(a->c, &super); 434 if(b == nil){ 435 vtUnlock(a->fs->elk); 436 fprint(2, "archThread: superGet: %R\n"); 437 sleep(60*1000); 438 continue; 439 } 440 super.current = NilBlock; 441 memmove(super.last, p.score, VtScoreSize); 442 superPack(&super, b->data); 443 blockDirty(b); 444 blockPut(b); 445 vtUnlock(a->fs->elk); 446 447 consPrint("archive vac:%V\n", p.score); 448 } 449 450 Done: 451 a->ref--; 452 vtWakeup(a->die); 453 vtUnlock(a->lk); 454 } 455 456 void 457 archKick(Arch *a) 458 { 459 if(a == nil){ 460 fprint(2, "warning: archKick nil\n"); 461 return; 462 } 463 vtLock(a->lk); 464 vtWakeup(a->starve); 465 vtUnlock(a->lk); 466 } 467