1 /*
2 * Archiver. In charge of sending blocks to Venti.
3 */
4
5 #include "stdinc.h"
6 #include "dat.h"
7 #include "fns.h"
8 #include "error.h"
9
10 #include "9.h" /* for consPrint */
11
12 #define DEBUG 0
13
14 static void archThread(void*);
15
16 struct Arch
17 {
18 int ref;
19 uint blockSize;
20 uint diskSize;
21 Cache *c;
22 Fs *fs;
23 VtConn *z;
24
25 QLock lk;
26 Rendez starve;
27 Rendez die;
28 };
29
30 Arch *
archInit(Cache * c,Disk * disk,Fs * fs,VtConn * z)31 archInit(Cache *c, Disk *disk, Fs *fs, VtConn *z)
32 {
33 Arch *a;
34
35 a = vtmallocz(sizeof(Arch));
36
37 a->c = c;
38 a->z = z;
39 a->fs = fs;
40 a->blockSize = diskBlockSize(disk);
41 a->starve.l = &a->lk;
42
43 a->ref = 2;
44 proccreate(archThread, a, STACK);
45
46 return a;
47 }
48
49 void
archFree(Arch * a)50 archFree(Arch *a)
51 {
52 /* kill slave */
53 qlock(&a->lk);
54 a->die.l = &a->lk;
55 rwakeup(&a->starve);
56 while(a->ref > 1)
57 rsleep(&a->die);
58 qunlock(&a->lk);
59 vtfree(a);
60 }
61
62 static int
ventiSend(Arch * a,Block * b,uchar * data)63 ventiSend(Arch *a, Block *b, uchar *data)
64 {
65 uint n;
66 uchar score[VtScoreSize];
67
68 if(DEBUG > 1)
69 fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l);
70 n = vtzerotruncate(vtType[b->l.type], data, a->blockSize);
71 if(DEBUG > 1)
72 fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n);
73 if(vtwrite(a->z, score, vtType[b->l.type], data, n) < 0){
74 fprint(2, "ventiSend: vtwrite block %#ux failed: %r\n", b->addr);
75 return 0;
76 }
77 if(vtsha1check(score, data, n) < 0){
78 uchar score2[VtScoreSize];
79 vtsha1(score2, data, n);
80 fprint(2, "ventiSend: vtwrite block %#ux failed vtsha1check %V %V\n",
81 b->addr, score, score2);
82 return 0;
83 }
84 if(vtsync(a->z) < 0)
85 return 0;
86 return 1;
87 }
88
89 /*
90 * parameters for recursion; there are so many,
91 * and some only change occasionally. this is
92 * easier than spelling things out at each call.
93 */
94 typedef struct Param Param;
95 struct Param
96 {
97 /* these never change */
98 uint snapEpoch; /* epoch for snapshot being archived */
99 uint blockSize;
100 Cache *c;
101 Arch *a;
102
103 /* changes on every call */
104 uint depth;
105
106 /* statistics */
107 uint nfixed;
108 uint nsend;
109 uint nvisit;
110 uint nfailsend;
111 uint maxdepth;
112 uint nreclaim;
113 uint nfake;
114 uint nreal;
115
116 /* these occasionally change (must save old values and put back) */
117 uint dsize;
118 uint psize;
119
120 /* return value; avoids using stack space */
121 Label l;
122 uchar score[VtScoreSize];
123 };
124
125 static void
shaBlock(uchar score[VtScoreSize],Block * b,uchar * data,uint bsize)126 shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize)
127 {
128 vtsha1(score, data, vtzerotruncate(vtType[b->l.type], data, bsize));
129 }
130
131 static uint
etype(Entry * e)132 etype(Entry *e)
133 {
134 uint t;
135
136 if(e->flags&_VtEntryDir)
137 t = BtDir;
138 else
139 t = BtData;
140 return t+e->depth;
141 }
142
143 static uchar*
copyBlock(Block * b,u32int blockSize)144 copyBlock(Block *b, u32int blockSize)
145 {
146 uchar *data;
147
148 data = vtmalloc(blockSize);
149 if(data == nil)
150 return nil;
151 memmove(data, b->data, blockSize);
152 return data;
153 }
154
155 /*
156 * Walk over the block tree, archiving it to Venti.
157 *
158 * We don't archive the snapshots. Instead we zero the
159 * entries in a temporary copy of the block and archive that.
160 *
161 * Return value is:
162 *
163 * ArchFailure some error occurred
164 * ArchSuccess block and all children archived
165 * ArchFaked success, but block or children got copied
166 */
167 enum
168 {
169 ArchFailure,
170 ArchSuccess,
171 ArchFaked,
172 };
173 static int
archWalk(Param * p,u32int addr,uchar type,u32int tag)174 archWalk(Param *p, u32int addr, uchar type, u32int tag)
175 {
176 int ret, i, x, psize, dsize;
177 uchar *data, score[VtScoreSize];
178 Block *b;
179 Label l;
180 Entry *e;
181 WalkPtr w;
182 char err[ERRMAX];
183
184 p->nvisit++;
185
186 b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0);
187 if(b == nil){
188 fprint(2, "archive(%ud, %#ux): cannot find block: %r\n", p->snapEpoch, addr);
189 rerrstr(err, sizeof err);
190 if(strcmp(err, ELabelMismatch) == 0){
191 /* might as well plod on so we write _something_ to Venti */
192 memmove(p->score, vtzeroscore, VtScoreSize);
193 return ArchFaked;
194 }
195 return ArchFailure;
196 }
197
198 if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n",
199 p->depth*2, "", p->snapEpoch, b->addr, &b->l);
200 p->depth++;
201 if(p->depth > p->maxdepth)
202 p->maxdepth = p->depth;
203
204 data = b->data;
205 if((b->l.state&BsVenti) == 0){
206 initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize);
207 for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){
208 if(e){
209 if(!(e->flags&VtEntryActive))
210 continue;
211 if((e->snap && !e->archive)
212 || (e->flags&VtEntryNoArchive)){
213 if(0) fprint(2, "snap; faking %#ux\n", b->addr);
214 if(data == b->data){
215 data = copyBlock(b, p->blockSize);
216 if(data == nil){
217 ret = ArchFailure;
218 goto Out;
219 }
220 w.data = data;
221 }
222 memmove(e->score, vtzeroscore, VtScoreSize);
223 e->depth = 0;
224 e->size = 0;
225 e->tag = 0;
226 e->flags &= ~VtEntryLocal;
227 entryPack(e, data, w.n-1);
228 continue;
229 }
230 }
231 addr = globalToLocal(score);
232 if(addr == NilBlock)
233 continue;
234 dsize = p->dsize;
235 psize = p->psize;
236 if(e){
237 p->dsize= e->dsize;
238 p->psize = e->psize;
239 }
240 qunlock(&b->lk);
241 x = archWalk(p, addr, type, tag);
242 qlock(&b->lk);
243 if(e){
244 p->dsize = dsize;
245 p->psize = psize;
246 }
247 while(b->iostate != BioClean && b->iostate != BioDirty)
248 rsleep(&b->ioready);
249 switch(x){
250 case ArchFailure:
251 fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n",
252 addr, b->addr, i);
253 ret = ArchFailure;
254 goto Out;
255 case ArchFaked:
256 /*
257 * When we're writing the entry for an archive directory
258 * (like /archive/2003/1215) then even if we've faked
259 * any data, record the score unconditionally.
260 * This way, we will always record the Venti score here.
261 * Otherwise, temporary data or corrupted file system
262 * would cause us to keep holding onto the on-disk
263 * copy of the archive.
264 */
265 if(e==nil || !e->archive)
266 if(data == b->data){
267 if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score);
268 data = copyBlock(b, p->blockSize);
269 if(data == nil){
270 ret = ArchFailure;
271 goto Out;
272 }
273 w.data = data;
274 }
275 /* fall through */
276 if(0) fprint(2, "falling\n");
277 case ArchSuccess:
278 if(e){
279 memmove(e->score, p->score, VtScoreSize);
280 e->flags &= ~VtEntryLocal;
281 entryPack(e, data, w.n-1);
282 }else
283 memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize);
284 if(data == b->data){
285 blockDirty(b);
286 /*
287 * If b is in the active tree, then we need to note that we've
288 * just removed addr from the active tree (replacing it with the
289 * copy we just stored to Venti). If addr is in other snapshots,
290 * this will close addr but not free it, since it has a non-empty
291 * epoch range.
292 *
293 * If b is in the active tree but has been copied (this can happen
294 * if we get killed at just the right moment), then we will
295 * mistakenly leak its kids.
296 *
297 * The children of an archive directory (e.g., /archive/2004/0604)
298 * are not treated as in the active tree.
299 */
300 if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0))
301 blockRemoveLink(b, addr, p->l.type, p->l.tag, 0);
302 }
303 break;
304 }
305 }
306
307 if(!ventiSend(p->a, b, data)){
308 p->nfailsend++;
309 ret = ArchFailure;
310 goto Out;
311 }
312 p->nsend++;
313 if(data != b->data)
314 p->nfake++;
315 if(data == b->data){ /* not faking it, so update state */
316 p->nreal++;
317 l = b->l;
318 l.state |= BsVenti;
319 if(!blockSetLabel(b, &l, 0)){
320 ret = ArchFailure;
321 goto Out;
322 }
323 }
324 }
325
326 shaBlock(p->score, b, data, p->blockSize);
327 if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data);
328 ret = data!=b->data ? ArchFaked : ArchSuccess;
329 p->l = b->l;
330 Out:
331 if(data != b->data)
332 vtfree(data);
333 p->depth--;
334 blockPut(b);
335 return ret;
336 }
337
338 static void
archThread(void * v)339 archThread(void *v)
340 {
341 Arch *a = v;
342 Block *b;
343 Param p;
344 Super super;
345 int ret;
346 u32int addr;
347 uchar rbuf[VtRootSize];
348 VtRoot root;
349
350 threadsetname("arch");
351
352 for(;;){
353 /* look for work */
354 wlock(&a->fs->elk);
355 b = superGet(a->c, &super);
356 if(b == nil){
357 wunlock(&a->fs->elk);
358 fprint(2, "archThread: superGet: %r\n");
359 sleep(60*1000);
360 continue;
361 }
362 addr = super.next;
363 if(addr != NilBlock && super.current == NilBlock){
364 super.current = addr;
365 super.next = NilBlock;
366 superPack(&super, b->data);
367 blockDirty(b);
368 }else
369 addr = super.current;
370 blockPut(b);
371 wunlock(&a->fs->elk);
372
373 if(addr == NilBlock){
374 /* wait for work */
375 qlock(&a->lk);
376 rsleep(&a->starve);
377 if(a->die.l != nil)
378 goto Done;
379 qunlock(&a->lk);
380 continue;
381 }
382
383 sleep(10*1000); /* window of opportunity to provoke races */
384
385 /* do work */
386 memset(&p, 0, sizeof p);
387 p.blockSize = a->blockSize;
388 p.dsize = 3*VtEntrySize; /* root has three Entries */
389 p.c = a->c;
390 p.a = a;
391
392 ret = archWalk(&p, addr, BtDir, RootTag);
393 switch(ret){
394 default:
395 abort();
396 case ArchFailure:
397 fprint(2, "archiveBlock %#ux: %r\n", addr);
398 sleep(60*1000);
399 continue;
400 case ArchSuccess:
401 case ArchFaked:
402 break;
403 }
404
405 if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud"
406 " send %ud nfailsend %ud nvisit %ud"
407 " nreclaim %ud nfake %ud nreal %ud\n",
408 addr, p.maxdepth, p.nfixed,
409 p.nsend, p.nfailsend, p.nvisit,
410 p.nreclaim, p.nfake, p.nreal);
411 if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize);
412
413 /* tie up vac root */
414 memset(&root, 0, sizeof root);
415 strecpy(root.type, root.type+sizeof root.type, "vac");
416 strecpy(root.name, root.name+sizeof root.name, "fossil");
417 memmove(root.score, p.score, VtScoreSize);
418 memmove(root.prev, super.last, VtScoreSize);
419 root.blocksize = a->blockSize;
420 vtrootpack(&root, rbuf);
421 if(vtwrite(a->z, p.score, VtRootType, rbuf, VtRootSize) < 0
422 || vtsha1check(p.score, rbuf, VtRootSize) < 0){
423 fprint(2, "vtWriteBlock %#ux: %r\n", addr);
424 sleep(60*1000);
425 continue;
426 }
427
428 /* record success */
429 wlock(&a->fs->elk);
430 b = superGet(a->c, &super);
431 if(b == nil){
432 wunlock(&a->fs->elk);
433 fprint(2, "archThread: superGet: %r\n");
434 sleep(60*1000);
435 continue;
436 }
437 super.current = NilBlock;
438 memmove(super.last, p.score, VtScoreSize);
439 superPack(&super, b->data);
440 blockDirty(b);
441 blockPut(b);
442 wunlock(&a->fs->elk);
443
444 consPrint("archive vac:%V\n", p.score);
445 }
446
447 Done:
448 a->ref--;
449 rwakeup(&a->die);
450 qunlock(&a->lk);
451 }
452
453 void
archKick(Arch * a)454 archKick(Arch *a)
455 {
456 if(a == nil){
457 fprint(2, "warning: archKick nil\n");
458 return;
459 }
460 qlock(&a->lk);
461 rwakeup(&a->starve);
462 qunlock(&a->lk);
463 }
464