1 /*
2 * Archiver. In charge of sending blocks to Venti.
3 */
4
5 #include "stdinc.h"
6 #include "dat.h"
7 #include "fns.h"
8 #include "error.h"
9
10 #include "9.h" /* for consPrint */
11
12 #define DEBUG 0
13
14 static void archThread(void*);
15
16 struct Arch
17 {
18 int ref;
19 uint blockSize;
20 uint diskSize;
21 Cache *c;
22 Fs *fs;
23 VtSession *z;
24
25 VtLock *lk;
26 VtRendez *starve;
27 VtRendez *die;
28 };
29
30 Arch *
archInit(Cache * c,Disk * disk,Fs * fs,VtSession * z)31 archInit(Cache *c, Disk *disk, Fs *fs, VtSession *z)
32 {
33 Arch *a;
34
35 a = vtMemAllocZ(sizeof(Arch));
36
37 a->c = c;
38 a->z = z;
39 a->fs = fs;
40 a->blockSize = diskBlockSize(disk);
41 a->lk = vtLockAlloc();
42 a->starve = vtRendezAlloc(a->lk);
43
44 a->ref = 2;
45 vtThread(archThread, a);
46
47 return a;
48 }
49
50 void
archFree(Arch * a)51 archFree(Arch *a)
52 {
53 /* kill slave */
54 vtLock(a->lk);
55 a->die = vtRendezAlloc(a->lk);
56 vtWakeup(a->starve);
57 while(a->ref > 1)
58 vtSleep(a->die);
59 vtUnlock(a->lk);
60 vtRendezFree(a->starve);
61 vtRendezFree(a->die);
62 vtLockFree(a->lk);
63 vtMemFree(a);
64 }
65
66 static int
ventiSend(Arch * a,Block * b,uchar * data)67 ventiSend(Arch *a, Block *b, uchar *data)
68 {
69 uint n;
70 uchar score[VtScoreSize];
71
72 if(DEBUG > 1)
73 fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l);
74 n = vtZeroTruncate(vtType[b->l.type], data, a->blockSize);
75 if(DEBUG > 1)
76 fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n);
77 if(!vtWrite(a->z, score, vtType[b->l.type], data, n)){
78 fprint(2, "ventiSend: vtWrite block %#ux failed: %R\n", b->addr);
79 return 0;
80 }
81 if(!vtSha1Check(score, data, n)){
82 uchar score2[VtScoreSize];
83 vtSha1(score2, data, n);
84 fprint(2, "ventiSend: vtWrite block %#ux failed vtSha1Check %V %V\n",
85 b->addr, score, score2);
86 return 0;
87 }
88 if(!vtSync(a->z))
89 return 0;
90 return 1;
91 }
92
93 /*
94 * parameters for recursion; there are so many,
95 * and some only change occasionally. this is
96 * easier than spelling things out at each call.
97 */
98 typedef struct Param Param;
99 struct Param
100 {
101 /* these never change */
102 uint snapEpoch; /* epoch for snapshot being archived */
103 uint blockSize;
104 Cache *c;
105 Arch *a;
106
107 /* changes on every call */
108 uint depth;
109
110 /* statistics */
111 uint nfixed;
112 uint nsend;
113 uint nvisit;
114 uint nfailsend;
115 uint maxdepth;
116 uint nreclaim;
117 uint nfake;
118 uint nreal;
119
120 /* these occasionally change (must save old values and put back) */
121 uint dsize;
122 uint psize;
123
124 /* return value; avoids using stack space */
125 Label l;
126 uchar score[VtScoreSize];
127 };
128
129 static void
shaBlock(uchar score[VtScoreSize],Block * b,uchar * data,uint bsize)130 shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize)
131 {
132 vtSha1(score, data, vtZeroTruncate(vtType[b->l.type], data, bsize));
133 }
134
135 static uint
etype(Entry * e)136 etype(Entry *e)
137 {
138 uint t;
139
140 if(e->flags&VtEntryDir)
141 t = BtDir;
142 else
143 t = BtData;
144 return t+e->depth;
145 }
146
147 static uchar*
copyBlock(Block * b,u32int blockSize)148 copyBlock(Block *b, u32int blockSize)
149 {
150 uchar *data;
151
152 data = vtMemAlloc(blockSize);
153 if(data == nil)
154 return nil;
155 memmove(data, b->data, blockSize);
156 return data;
157 }
158
159 /*
160 * Walk over the block tree, archiving it to Venti.
161 *
162 * We don't archive the snapshots. Instead we zero the
163 * entries in a temporary copy of the block and archive that.
164 *
165 * Return value is:
166 *
167 * ArchFailure some error occurred
168 * ArchSuccess block and all children archived
169 * ArchFaked success, but block or children got copied
170 */
171 enum
172 {
173 ArchFailure,
174 ArchSuccess,
175 ArchFaked,
176 };
177 static int
archWalk(Param * p,u32int addr,uchar type,u32int tag)178 archWalk(Param *p, u32int addr, uchar type, u32int tag)
179 {
180 int ret, i, x, psize, dsize;
181 uchar *data, score[VtScoreSize];
182 Block *b;
183 Label l;
184 Entry *e;
185 WalkPtr w;
186
187 p->nvisit++;
188
189 b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0);
190 if(b == nil){
191 fprint(2, "archive(%ud, %#ux): cannot find block: %R\n", p->snapEpoch, addr);
192 if(strcmp(vtGetError(), ELabelMismatch) == 0){
193 /* might as well plod on so we write _something_ to Venti */
194 memmove(p->score, vtZeroScore, VtScoreSize);
195 return ArchFaked;
196 }
197 return ArchFailure;
198 }
199
200 if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n",
201 p->depth*2, "", p->snapEpoch, b->addr, &b->l);
202 p->depth++;
203 if(p->depth > p->maxdepth)
204 p->maxdepth = p->depth;
205
206 data = b->data;
207 if((b->l.state&BsVenti) == 0){
208 initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize);
209 for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){
210 if(e){
211 if(!(e->flags&VtEntryActive))
212 continue;
213 if((e->snap && !e->archive)
214 || (e->flags&VtEntryNoArchive)){
215 if(0) fprint(2, "snap; faking %#ux\n", b->addr);
216 if(data == b->data){
217 data = copyBlock(b, p->blockSize);
218 if(data == nil){
219 ret = ArchFailure;
220 goto Out;
221 }
222 w.data = data;
223 }
224 memmove(e->score, vtZeroScore, VtScoreSize);
225 e->depth = 0;
226 e->size = 0;
227 e->tag = 0;
228 e->flags &= ~VtEntryLocal;
229 entryPack(e, data, w.n-1);
230 continue;
231 }
232 }
233 addr = globalToLocal(score);
234 if(addr == NilBlock)
235 continue;
236 dsize = p->dsize;
237 psize = p->psize;
238 if(e){
239 p->dsize= e->dsize;
240 p->psize = e->psize;
241 }
242 vtUnlock(b->lk);
243 x = archWalk(p, addr, type, tag);
244 vtLock(b->lk);
245 if(e){
246 p->dsize = dsize;
247 p->psize = psize;
248 }
249 while(b->iostate != BioClean && b->iostate != BioDirty)
250 vtSleep(b->ioready);
251 switch(x){
252 case ArchFailure:
253 fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n",
254 addr, b->addr, i);
255 ret = ArchFailure;
256 goto Out;
257 case ArchFaked:
258 /*
259 * When we're writing the entry for an archive directory
260 * (like /archive/2003/1215) then even if we've faked
261 * any data, record the score unconditionally.
262 * This way, we will always record the Venti score here.
263 * Otherwise, temporary data or corrupted file system
264 * would cause us to keep holding onto the on-disk
265 * copy of the archive.
266 */
267 if(e==nil || !e->archive)
268 if(data == b->data){
269 if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score);
270 data = copyBlock(b, p->blockSize);
271 if(data == nil){
272 ret = ArchFailure;
273 goto Out;
274 }
275 w.data = data;
276 }
277 /* fall through */
278 if(0) fprint(2, "falling\n");
279 case ArchSuccess:
280 if(e){
281 memmove(e->score, p->score, VtScoreSize);
282 e->flags &= ~VtEntryLocal;
283 entryPack(e, data, w.n-1);
284 }else
285 memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize);
286 if(data == b->data){
287 blockDirty(b);
288 /*
289 * If b is in the active tree, then we need to note that we've
290 * just removed addr from the active tree (replacing it with the
291 * copy we just stored to Venti). If addr is in other snapshots,
292 * this will close addr but not free it, since it has a non-empty
293 * epoch range.
294 *
295 * If b is in the active tree but has been copied (this can happen
296 * if we get killed at just the right moment), then we will
297 * mistakenly leak its kids.
298 *
299 * The children of an archive directory (e.g., /archive/2004/0604)
300 * are not treated as in the active tree.
301 */
302 if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0))
303 blockRemoveLink(b, addr, p->l.type, p->l.tag, 0);
304 }
305 break;
306 }
307 }
308
309 if(!ventiSend(p->a, b, data)){
310 p->nfailsend++;
311 ret = ArchFailure;
312 goto Out;
313 }
314 p->nsend++;
315 if(data != b->data)
316 p->nfake++;
317 if(data == b->data){ /* not faking it, so update state */
318 p->nreal++;
319 l = b->l;
320 l.state |= BsVenti;
321 if(!blockSetLabel(b, &l, 0)){
322 ret = ArchFailure;
323 goto Out;
324 }
325 }
326 }
327
328 shaBlock(p->score, b, data, p->blockSize);
329 if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data);
330 ret = data!=b->data ? ArchFaked : ArchSuccess;
331 p->l = b->l;
332 Out:
333 if(data != b->data)
334 vtMemFree(data);
335 p->depth--;
336 blockPut(b);
337 return ret;
338 }
339
340 static void
archThread(void * v)341 archThread(void *v)
342 {
343 Arch *a = v;
344 Block *b;
345 Param p;
346 Super super;
347 int ret;
348 u32int addr;
349 uchar rbuf[VtRootSize];
350 VtRoot root;
351
352 vtThreadSetName("arch");
353
354 for(;;){
355 /* look for work */
356 vtLock(a->fs->elk);
357 b = superGet(a->c, &super);
358 if(b == nil){
359 vtUnlock(a->fs->elk);
360 fprint(2, "archThread: superGet: %R\n");
361 sleep(60*1000);
362 continue;
363 }
364 addr = super.next;
365 if(addr != NilBlock && super.current == NilBlock){
366 super.current = addr;
367 super.next = NilBlock;
368 superPack(&super, b->data);
369 blockDirty(b);
370 }else
371 addr = super.current;
372 blockPut(b);
373 vtUnlock(a->fs->elk);
374
375 if(addr == NilBlock){
376 /* wait for work */
377 vtLock(a->lk);
378 vtSleep(a->starve);
379 if(a->die != nil)
380 goto Done;
381 vtUnlock(a->lk);
382 continue;
383 }
384
385 sleep(10*1000); /* window of opportunity to provoke races */
386
387 /* do work */
388 memset(&p, 0, sizeof p);
389 p.blockSize = a->blockSize;
390 p.dsize = 3*VtEntrySize; /* root has three Entries */
391 p.c = a->c;
392 p.a = a;
393
394 ret = archWalk(&p, addr, BtDir, RootTag);
395 switch(ret){
396 default:
397 abort();
398 case ArchFailure:
399 fprint(2, "archiveBlock %#ux: %R\n", addr);
400 sleep(60*1000);
401 continue;
402 case ArchSuccess:
403 case ArchFaked:
404 break;
405 }
406
407 if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud"
408 " send %ud nfailsend %ud nvisit %ud"
409 " nreclaim %ud nfake %ud nreal %ud\n",
410 addr, p.maxdepth, p.nfixed,
411 p.nsend, p.nfailsend, p.nvisit,
412 p.nreclaim, p.nfake, p.nreal);
413 if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize);
414
415 /* tie up vac root */
416 memset(&root, 0, sizeof root);
417 root.version = VtRootVersion;
418 strecpy(root.type, root.type+sizeof root.type, "vac");
419 strecpy(root.name, root.name+sizeof root.name, "fossil");
420 memmove(root.score, p.score, VtScoreSize);
421 memmove(root.prev, super.last, VtScoreSize);
422 root.blockSize = a->blockSize;
423 vtRootPack(&root, rbuf);
424 if(!vtWrite(a->z, p.score, VtRootType, rbuf, VtRootSize)
425 || !vtSha1Check(p.score, rbuf, VtRootSize)){
426 fprint(2, "vtWriteBlock %#ux: %R\n", addr);
427 sleep(60*1000);
428 continue;
429 }
430
431 /* record success */
432 vtLock(a->fs->elk);
433 b = superGet(a->c, &super);
434 if(b == nil){
435 vtUnlock(a->fs->elk);
436 fprint(2, "archThread: superGet: %R\n");
437 sleep(60*1000);
438 continue;
439 }
440 super.current = NilBlock;
441 memmove(super.last, p.score, VtScoreSize);
442 superPack(&super, b->data);
443 blockDirty(b);
444 blockPut(b);
445 vtUnlock(a->fs->elk);
446
447 consPrint("archive vac:%V\n", p.score);
448 }
449
450 Done:
451 a->ref--;
452 vtWakeup(a->die);
453 vtUnlock(a->lk);
454 }
455
456 void
archKick(Arch * a)457 archKick(Arch *a)
458 {
459 if(a == nil){
460 fprint(2, "warning: archKick nil\n");
461 return;
462 }
463 vtLock(a->lk);
464 vtWakeup(a->starve);
465 vtUnlock(a->lk);
466 }
467