xref: /plan9/sys/src/cmd/venti/srv/buildindex.c (revision 1206f3fc1b0aab3e32fa15899d31a9b5bfa82d9f)
1 /*
2  * Rebuild the index from scratch, in place.
3  */
4 #include "stdinc.h"
5 #include "dat.h"
6 #include "fns.h"
7 
8 enum
9 {
10 	MinBufSize = 64*1024,
11 	MaxBufSize = 4*1024*1024,
12 };
13 
14 int		dumb;
15 int		errors;
16 char		**isect;
17 int		nisect;
18 int		bloom;
19 int		zero;
20 
21 u32int	isectmem;
22 u64int	totalbuckets;
23 u64int	totalclumps;
24 Channel	*arenadonechan;
25 Channel	*isectdonechan;
26 Index	*ix;
27 
28 u64int	arenaentries;
29 u64int	skipentries;
30 u64int	indexentries;
31 
32 static int shouldprocess(ISect*);
33 static void	isectproc(void*);
34 static void	arenapartproc(void*);
35 
36 void
usage(void)37 usage(void)
38 {
39 	fprint(2, "usage: buildindex [-b] [-i isect]... [-M imem] venti.conf\n");
40 	threadexitsall("usage");
41 }
42 
43 void
threadmain(int argc,char * argv[])44 threadmain(int argc, char *argv[])
45 {
46 	int fd, i, napart, nfinish, maxdisks;
47 	u32int bcmem, imem;
48 	Config conf;
49 	Part *p;
50 
51 	maxdisks = 100000;
52 	ventifmtinstall();
53 	imem = 256*1024*1024;
54 	ARGBEGIN{
55 	case 'b':
56 		bloom = 1;
57 		break;
58 	case 'd':	/* debugging - make sure to run all 3 passes */
59 		dumb = 1;
60 		break;
61 	case 'i':
62 		isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
63 		isect[nisect++] = EARGF(usage());
64 		break;
65 	case 'M':
66 		imem = unittoull(EARGF(usage()));
67 		break;
68 	case 'm':	/* temporary - might go away */
69 		maxdisks = atoi(EARGF(usage()));
70 		break;
71 	default:
72 		usage();
73 		break;
74 	}ARGEND
75 
76 	if(argc != 1)
77 		usage();
78 
79 	if(initventi(argv[0], &conf) < 0)
80 		sysfatal("can't init venti: %r");
81 	ix = mainindex;
82 	if(nisect == 0 && ix->bloom)
83 		bloom = 1;
84 	if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
85 		sysfatal("loadbloom: %r");
86 	if(bloom && !ix->bloom)
87 		sysfatal("-b specified but no bloom filter");
88 	if(!bloom)
89 		ix->bloom = nil;
90 	isectmem = imem/ix->nsects;
91 
92 	/*
93 	 * safety first - only need read access to arenas
94 	 */
95 	p = nil;
96 	for(i=0; i<ix->narenas; i++){
97 		if(ix->arenas[i]->part != p){
98 			p = ix->arenas[i]->part;
99 			if((fd = open(p->filename, OREAD)) < 0)
100 				sysfatal("cannot reopen %s: %r", p->filename);
101 			dup(fd, p->fd);
102 			close(fd);
103 		}
104 	}
105 
106 	/*
107 	 * need a block for every arena
108 	 */
109 	bcmem = maxblocksize * (mainindex->narenas + 16);
110 	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
111 	initdcache(bcmem);
112 
113 	totalclumps = 0;
114 	for(i=0; i<ix->narenas; i++)
115 		totalclumps += ix->arenas[i]->diskstats.clumps;
116 
117 	totalbuckets = 0;
118 	for(i=0; i<ix->nsects; i++)
119 		totalbuckets += ix->sects[i]->blocks;
120 	fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
121 
122 	/* start index procs */
123 	fprint(2, "%T read index\n");
124 	isectdonechan = chancreate(sizeof(void*), 0);
125 	for(i=0; i<ix->nsects; i++){
126 		if(shouldprocess(ix->sects[i])){
127 			ix->sects[i]->writechan = chancreate(sizeof(IEntry), 0);
128 			vtproc(isectproc, ix->sects[i]);
129 		}
130 	}
131 
132 	for(i=0; i<nisect; i++)
133 		if(isect[i])
134 			fprint(2, "warning: did not find index section %s\n", isect[i]);
135 
136 	/* start arena procs */
137 	p = nil;
138 	napart = 0;
139 	nfinish = 0;
140 	arenadonechan = chancreate(sizeof(void*), 0);
141 	for(i=0; i<ix->narenas; i++){
142 		if(ix->arenas[i]->part != p){
143 			p = ix->arenas[i]->part;
144 			vtproc(arenapartproc, p);
145 			if(++napart >= maxdisks){
146 				recvp(arenadonechan);
147 				nfinish++;
148 			}
149 		}
150 	}
151 
152 	/* wait for arena procs to finish */
153 	for(nfinish=0; nfinish<napart; nfinish++)
154 		recvp(arenadonechan);
155 
156 	/* tell index procs to finish */
157 	for(i=0; i<ix->nsects; i++)
158 		if(ix->sects[i]->writechan)
159 			send(ix->sects[i]->writechan, nil);
160 
161 	/* wait for index procs to finish */
162 	for(i=0; i<ix->nsects; i++)
163 		if(ix->sects[i]->writechan)
164 			recvp(isectdonechan);
165 
166 	if(ix->bloom && writebloom(ix->bloom) < 0)
167 		fprint(2, "writing bloom filter: %r\n");
168 
169 	fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
170 		arenaentries, indexentries, skipentries);
171 	threadexitsall(nil);
172 }
173 
174 static int
shouldprocess(ISect * is)175 shouldprocess(ISect *is)
176 {
177 	int i;
178 
179 	if(nisect == 0)
180 		return 1;
181 
182 	for(i=0; i<nisect; i++)
183 		if(isect[i] && strcmp(isect[i], is->name) == 0){
184 			isect[i] = nil;
185 			return 1;
186 		}
187 	return 0;
188 }
189 
190 static void
add(u64int * a,u64int n)191 add(u64int *a, u64int n)
192 {
193 	static Lock l;
194 
195 	lock(&l);
196 	*a += n;
197 	unlock(&l);
198 }
199 
200 /*
201  * Read through an arena partition and send each of its IEntries
202  * to the appropriate index section.  When finished, send on
203  * arenadonechan.
204  */
205 enum
206 {
207 	ClumpChunks = 32*1024,
208 };
209 static void
arenapartproc(void * v)210 arenapartproc(void *v)
211 {
212 	int i, j, n, nskip, x;
213 	u32int clump;
214 	u64int addr, tot;
215 	Arena *a;
216 	ClumpInfo *ci, *cis;
217 	IEntry ie;
218 	Part *p;
219 
220 	p = v;
221 	threadsetname("arenaproc %s", p->name);
222 
223 	nskip = 0;
224 	tot = 0;
225 	cis = MKN(ClumpInfo, ClumpChunks);
226 	for(i=0; i<ix->narenas; i++){
227 		a = ix->arenas[i];
228 		if(a->part != p)
229 			continue;
230 		if(a->memstats.clumps)
231 			fprint(2, "%T arena %s: %d entries\n",
232 				a->name, a->memstats.clumps);
233 		/*
234 		 * Running the loop backwards accesses the
235 		 * clump info blocks forwards, since they are
236 		 * stored in reverse order at the end of the arena.
237 		 * This speeds things slightly.
238 		 */
239 		addr = ix->amap[i].start + a->memstats.used;
240 		for(clump=a->memstats.clumps; clump > 0; clump-=n){
241 			n = ClumpChunks;
242 			if(n > clump)
243 				n = clump;
244 			if(readclumpinfos(a, clump-n, cis, n) != n){
245 				fprint(2, "%T arena %s: directory read: %r\n", a->name);
246 				errors = 1;
247 				break;
248 			}
249 			for(j=n-1; j>=0; j--){
250 				ci = &cis[j];
251 				ie.ia.type = ci->type;
252 				ie.ia.size = ci->uncsize;
253 				addr -= ci->size + ClumpSize;
254 				ie.ia.addr = addr;
255 				ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
256 				scorecp(ie.score, ci->score);
257 				if(ci->type == VtCorruptType)
258 					nskip++;
259 				else{
260 					tot++;
261 					x = indexsect(ix, ie.score);
262 					assert(0 <= x && x < ix->nsects);
263 					if(ix->sects[x]->writechan)
264 						send(ix->sects[x]->writechan, &ie);
265 					if(ix->bloom)
266 						markbloomfilter(ix->bloom, ie.score);
267 				}
268 			}
269 		}
270 		if(addr != ix->amap[i].start)
271 			fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
272 	}
273 	add(&arenaentries, tot);
274 	add(&skipentries, nskip);
275 	sendp(arenadonechan, p);
276 }
277 
278 /*
279  * Convert score into relative bucket number in isect.
280  * Can pass a packed ientry instead of score - score is first.
281  */
282 static u32int
score2bucket(ISect * is,uchar * score)283 score2bucket(ISect *is, uchar *score)
284 {
285 	u32int b;
286 
287 	b = hashbits(score, 32)/ix->div;
288 	if(b < is->start || b >= is->stop){
289 		fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
290 			score, ix->div, b, is->start, is->stop);
291 	}
292 	assert(is->start <= b && b < is->stop);
293 	return b - is->start;
294 }
295 
296 /*
297  * Convert offset in index section to bucket number.
298  */
299 static u32int
offset2bucket(ISect * is,u64int offset)300 offset2bucket(ISect *is, u64int offset)
301 {
302 	u32int b;
303 
304 	assert(is->blockbase <= offset);
305 	offset -= is->blockbase;
306 	b = offset/is->blocksize;
307 	assert(b < is->stop-is->start);
308 	return b;
309 }
310 
311 /*
312  * Convert bucket number to offset.
313  */
314 static u64int
bucket2offset(ISect * is,u32int b)315 bucket2offset(ISect *is, u32int b)
316 {
317 	assert(b <= is->stop-is->start);
318 	return is->blockbase + (u64int)b*is->blocksize;
319 }
320 
321 /*
322  * IEntry buffers to hold initial round of spraying.
323  */
324 typedef struct Buf Buf;
325 struct Buf
326 {
327 	Part *part;			/* partition being written */
328 	uchar *bp;		/* current block */
329 	uchar *ep;		/* end of block */
330 	uchar *wp;		/* write position in block */
331 	u64int boffset;		/* start offset */
332 	u64int woffset;		/* next write offset */
333 	u64int eoffset;		/* end offset */
334 	u32int nentry;		/* number of entries written */
335 };
336 
337 static void
bflush(Buf * buf)338 bflush(Buf *buf)
339 {
340 	u32int bufsize;
341 
342 	if(buf->woffset >= buf->eoffset)
343 		sysfatal("buf index chunk overflow - need bigger index");
344 	bufsize = buf->ep - buf->bp;
345 	if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
346 		fprint(2, "write %s: %r\n", buf->part->name);
347 		errors = 1;
348 	}
349 	buf->woffset += bufsize;
350 	memset(buf->bp, 0, bufsize);
351 	buf->wp = buf->bp;
352 }
353 
354 static void
bwrite(Buf * buf,IEntry * ie)355 bwrite(Buf *buf, IEntry *ie)
356 {
357 	if(buf->wp+IEntrySize > buf->ep)
358 		bflush(buf);
359 	assert(buf->bp <= buf->wp && buf->wp < buf->ep);
360 	packientry(ie, buf->wp);
361 	buf->wp += IEntrySize;
362 	assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
363 	buf->nentry++;
364 }
365 
366 /*
367  * Minibuffer.  In-memory data structure holds our place
368  * in the buffer but has no block data.  We are writing and
369  * reading the minibuffers at the same time.  (Careful!)
370  */
371 typedef struct Minibuf Minibuf;
372 struct Minibuf
373 {
374 	u64int boffset;		/* start offset */
375 	u64int roffset;		/* read offset */
376 	u64int woffset;		/* write offset */
377 	u64int eoffset;		/* end offset */
378 	u32int nentry;		/* # entries left to read */
379 	u32int nwentry;	/* # entries written */
380 };
381 
382 /*
383  * Index entry pool.  Used when trying to shuffle around
384  * the entries in a big buffer into the corresponding M minibuffers.
385  * Sized to hold M*EntriesPerBlock entries, so that there will always
386  * either be room in the pool for another block worth of entries
387  * or there will be an entire block worth of sorted entries to
388  * write out.
389  */
390 typedef struct IEntryLink IEntryLink;
391 typedef struct IPool IPool;
392 
393 struct IEntryLink
394 {
395 	uchar ie[IEntrySize];		/* raw IEntry */
396 	IEntryLink *next;		/* next in chain */
397 };
398 
399 struct IPool
400 {
401 	ISect *isect;
402 	u32int buck0;			/* first bucket in pool */
403 	u32int mbufbuckets;	/* buckets per minibuf */
404 	IEntryLink *entry;		/* all IEntryLinks */
405 	u32int nentry;			/* # of IEntryLinks */
406 	IEntryLink *free;		/* free list */
407 	u32int nfree;			/* # on free list */
408 	Minibuf *mbuf;			/* all minibufs */
409 	u32int nmbuf;			/* # of minibufs */
410 	IEntryLink **mlist;		/* lists for each minibuf */
411 	u32int *mcount;		/* # on each mlist[i] */
412 	u32int bufsize;			/* block buffer size */
413 	uchar *rbuf;			/* read buffer */
414 	uchar *wbuf;			/* write buffer */
415 	u32int epbuf;			/* entries per block buffer */
416 };
417 
418 /*
419 static int
420 countsokay(IPool *p)
421 {
422 	int i;
423 	u64int n;
424 
425 	n = 0;
426 	for(i=0; i<p->nmbuf; i++)
427 		n += p->mcount[i];
428 	n += p->nfree;
429 	if(n != p->nentry){
430 		print("free %ud:", p->nfree);
431 		for(i=0; i<p->nmbuf; i++)
432 			print(" %ud", p->mcount[i]);
433 		print(" = %lld nentry: %ud\n", n, p->nentry);
434 	}
435 	return n == p->nentry;
436 }
437 */
438 
439 static IPool*
mkipool(ISect * isect,Minibuf * mbuf,u32int nmbuf,u32int mbufbuckets,u32int bufsize)440 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
441 	u32int mbufbuckets, u32int bufsize)
442 {
443 	u32int i, nentry;
444 	uchar *data;
445 	IPool *p;
446 	IEntryLink *l;
447 
448 	nentry = (nmbuf+1)*bufsize / IEntrySize;
449 	p = ezmalloc(sizeof(IPool)
450 		+nentry*sizeof(IEntry)
451 		+nmbuf*sizeof(IEntryLink*)
452 		+nmbuf*sizeof(u32int)
453 		+3*bufsize);
454 
455 	p->isect = isect;
456 	p->mbufbuckets = mbufbuckets;
457 	p->bufsize = bufsize;
458 	p->entry = (IEntryLink*)(p+1);
459 	p->nentry = nentry;
460 	p->mlist = (IEntryLink**)(p->entry+nentry);
461 	p->mcount = (u32int*)(p->mlist+nmbuf);
462 	p->nmbuf = nmbuf;
463 	p->mbuf = mbuf;
464 	data = (uchar*)(p->mcount+nmbuf);
465 	data += bufsize - (uintptr)data%bufsize;
466 	p->rbuf = data;
467 	p->wbuf = data+bufsize;
468 	p->epbuf = bufsize/IEntrySize;
469 
470 	for(i=0; i<p->nentry; i++){
471 		l = &p->entry[i];
472 		l->next = p->free;
473 		p->free = l;
474 		p->nfree++;
475 	}
476 	return p;
477 }
478 
479 /*
480  * Add the index entry ie to the pool p.
481  * Caller must know there is room.
482  */
483 static void
ipoolinsert(IPool * p,uchar * ie)484 ipoolinsert(IPool *p, uchar *ie)
485 {
486 	u32int buck, x;
487 	IEntryLink *l;
488 
489 	assert(p->free != nil);
490 
491 	buck = score2bucket(p->isect, ie);
492 	x = (buck-p->buck0) / p->mbufbuckets;
493 	if(x >= p->nmbuf){
494 		fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
495 			buck, p->mbufbuckets, x);
496 	}
497 	assert(x < p->nmbuf);
498 
499 	l = p->free;
500 	p->free = l->next;
501 	p->nfree--;
502 	memmove(l->ie, ie, IEntrySize);
503 	l->next = p->mlist[x];
504 	p->mlist[x] = l;
505 	p->mcount[x]++;
506 }
507 
508 /*
509  * Pull out a block containing as many
510  * entries as possible for minibuffer x.
511  */
512 static u32int
ipoolgetbuf(IPool * p,u32int x)513 ipoolgetbuf(IPool *p, u32int x)
514 {
515 	uchar *bp, *ep, *wp;
516 	IEntryLink *l;
517 	u32int n;
518 
519 	bp = p->wbuf;
520 	ep = p->wbuf + p->bufsize;
521 	n = 0;
522 	assert(x < p->nmbuf);
523 	for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
524 		l = p->mlist[x];
525 		p->mlist[x] = l->next;
526 		p->mcount[x]--;
527 		memmove(wp, l->ie, IEntrySize);
528 		l->next = p->free;
529 		p->free = l;
530 		p->nfree++;
531 		n++;
532 	}
533 	memset(wp, 0, ep-wp);
534 	return n;
535 }
536 
537 /*
538  * Read a block worth of entries from the minibuf
539  * into the pool.  Caller must know there is room.
540  */
541 static void
ipoolloadblock(IPool * p,Minibuf * mb)542 ipoolloadblock(IPool *p, Minibuf *mb)
543 {
544 	u32int i, n;
545 
546 	assert(mb->nentry > 0);
547 	assert(mb->roffset >= mb->woffset);
548 	assert(mb->roffset < mb->eoffset);
549 
550 	n = p->bufsize/IEntrySize;
551 	if(n > mb->nentry)
552 		n = mb->nentry;
553 	if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
554 		fprint(2, "readpart %s: %r\n", p->isect->part->name);
555 	else{
556 		for(i=0; i<n; i++)
557 			ipoolinsert(p, p->rbuf+i*IEntrySize);
558 	}
559 	mb->nentry -= n;
560 	mb->roffset += p->bufsize;
561 }
562 
563 /*
564  * Write out a block worth of entries to minibuffer x.
565  * If necessary, pick up the data there before overwriting it.
566  */
567 static void
ipoolflush0(IPool * pool,u32int x)568 ipoolflush0(IPool *pool, u32int x)
569 {
570 	u32int bufsize;
571 	Minibuf *mb;
572 
573 	mb = pool->mbuf+x;
574 	bufsize = pool->bufsize;
575 	mb->nwentry += ipoolgetbuf(pool, x);
576 	if(mb->nentry > 0 && mb->roffset == mb->woffset){
577 		assert(pool->nfree >= pool->bufsize/IEntrySize);
578 		/*
579 		 * There will be room in the pool -- we just
580 		 * removed a block worth.
581 		 */
582 		ipoolloadblock(pool, mb);
583 	}
584 	if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
585 		fprint(2, "writepart %s: %r\n", pool->isect->part->name);
586 	mb->woffset += bufsize;
587 }
588 
589 /*
590  * Write out some full block of entries.
591  * (There must be one -- the pool is almost full!)
592  */
593 static void
ipoolflush1(IPool * pool)594 ipoolflush1(IPool *pool)
595 {
596 	u32int i;
597 
598 	assert(pool->nfree <= pool->epbuf);
599 
600 	for(i=0; i<pool->nmbuf; i++){
601 		if(pool->mcount[i] >= pool->epbuf){
602 			ipoolflush0(pool, i);
603 			return;
604 		}
605 	}
606 	/* can't be reached - someone must be full */
607 	sysfatal("ipoolflush1");
608 }
609 
610 /*
611  * Flush all the entries in the pool out to disk.
612  * Nothing more to read from disk.
613  */
614 static void
ipoolflush(IPool * pool)615 ipoolflush(IPool *pool)
616 {
617 	u32int i;
618 
619 	for(i=0; i<pool->nmbuf; i++)
620 		while(pool->mlist[i])
621 			ipoolflush0(pool, i);
622 	assert(pool->nfree == pool->nentry);
623 }
624 
625 /*
626  * Third pass.  Pick up each minibuffer from disk into
627  * memory and then write out the buckets.
628  */
629 
630 /*
631  * Compare two packed index entries.
632  * Usual ordering except break ties by putting higher
633  * index addresses first (assumes have duplicates
634  * due to corruption in the lower addresses).
635  */
636 static int
ientrycmpaddr(const void * va,const void * vb)637 ientrycmpaddr(const void *va, const void *vb)
638 {
639 	int i;
640 	uchar *a, *b;
641 
642 	a = (uchar*)va;
643 	b = (uchar*)vb;
644 	i = ientrycmp(a, b);
645 	if(i)
646 		return i;
647 	return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
648 }
649 
650 static void
zerorange(Part * p,u64int o,u64int e)651 zerorange(Part *p, u64int o, u64int e)
652 {
653 	static uchar zero[MaxIoSize];
654 	u32int n;
655 
656 	for(; o<e; o+=n){
657 		n = sizeof zero;
658 		if(o+n > e)
659 			n = e-o;
660 		if(writepart(p, o, zero, n) < 0)
661 			fprint(2, "writepart %s: %r\n", p->name);
662 	}
663 }
664 
665 /*
666  * Load a minibuffer into memory and write out the
667  * corresponding buckets.
668  */
669 static void
sortminibuffer(ISect * is,Minibuf * mb,uchar * buf,u32int nbuf,u32int bufsize)670 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
671 {
672 	uchar *buckdata, *p, *q, *ep;
673 	u32int b, lastb, memsize, n;
674 	u64int o;
675 	IBucket ib;
676 	Part *part;
677 
678 	part = is->part;
679 	buckdata = emalloc(is->blocksize);
680 
681 	if(mb->nwentry == 0)
682 		return;
683 
684 	/*
685 	 * read entire buffer.
686 	 */
687 	assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
688 	assert(mb->woffset-mb->boffset <= nbuf);
689 	if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
690 		fprint(2, "readpart %s: %r\n", part->name);
691 		errors = 1;
692 		return;
693 	}
694 	assert(*(uint*)buf != 0xa5a5a5a5);
695 
696 	/*
697 	 * remove fragmentation due to IEntrySize
698 	 * not evenly dividing Bufsize
699 	 */
700 	memsize = (bufsize/IEntrySize)*IEntrySize;
701 	for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
702 		memmove(p, q, memsize);
703 		p += memsize;
704 		q += bufsize;
705 	}
706 	ep = buf + mb->nwentry*IEntrySize;
707 	assert(ep <= buf+nbuf);
708 
709 	/*
710 	 * sort entries
711 	 */
712 	qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
713 
714 	/*
715 	 * write buckets out
716 	 */
717 	n = 0;
718 	lastb = offset2bucket(is, mb->boffset);
719 	for(p=buf; p<ep; p=q){
720 		b = score2bucket(is, p);
721 		for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
722 			;
723 		if(lastb+1 < b && zero)
724 			zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
725 		if(IBucketSize+(q-p) > is->blocksize)
726 			sysfatal("bucket overflow - make index bigger");
727 		memmove(buckdata+IBucketSize, p, q-p);
728 		ib.n = (q-p)/IEntrySize;
729 		n += ib.n;
730 		packibucket(&ib, buckdata, is->bucketmagic);
731 		if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
732 			fprint(2, "write %s: %r\n", part->name);
733 		lastb = b;
734 	}
735 	if(lastb+1 < is->stop-is->start && zero)
736 		zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
737 
738 	if(n != mb->nwentry)
739 		fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
740 
741 	free(buckdata);
742 }
743 
744 static void
isectproc(void * v)745 isectproc(void *v)
746 {
747 	u32int buck, bufbuckets, bufsize, epbuf, i, j;
748 	u32int mbufbuckets, n, nbucket, nn, space;
749 	u32int nbuf, nminibuf, xminiclump, prod;
750 	u64int blocksize, offset, xclump;
751 	uchar *data, *p;
752 	Buf *buf;
753 	IEntry ie;
754 	IPool *ipool;
755 	ISect *is;
756 	Minibuf *mbuf, *mb;
757 
758 	is = v;
759 	blocksize = is->blocksize;
760 	nbucket = is->stop - is->start;
761 
762 	/*
763 	 * Three passes:
764 	 *	pass 1 - write index entries from arenas into
765 	 *		large sequential sections on index disk.
766 	 *		requires nbuf * bufsize memory.
767 	 *
768 	 *	pass 2 - split each section into minibufs.
769 	 *		requires nminibuf * bufsize memory.
770 	 *
771 	 *	pass 3 - read each minibuf into memory and
772 	 *		write buckets out.
773 	 *		requires entries/minibuf * IEntrySize memory.
774 	 *
775 	 * The larger we set bufsize the less seeking hurts us.
776 	 *
777 	 * The fewer sections and minibufs we have, the less
778 	 * seeking hurts us.
779 	 *
780 	 * The fewer sections and minibufs we have, the
781 	 * more entries we end up with in each minibuf
782 	 * at the end.
783 	 *
784 	 * Shoot for using half our memory to hold each
785 	 * minibuf.  The chance of a random distribution
786 	 * getting off by 2x is quite low.
787 	 *
788 	 * Once that is decided, figure out the smallest
789 	 * nminibuf and nsection/biggest bufsize we can use
790 	 * and still fit in the memory constraints.
791 	 */
792 
793 	/* expected number of clump index entries we'll see */
794 	xclump = nbucket * (double)totalclumps/totalbuckets;
795 
796 	/* number of clumps we want to see in a minibuf */
797 	xminiclump = isectmem/2/IEntrySize;
798 
799 	/* total number of minibufs we need */
800 	prod = (xclump+xminiclump-1) / xminiclump;
801 
802 	/* if possible, skip second pass */
803 	if(!dumb && prod*MinBufSize < isectmem){
804 		nbuf = prod;
805 		nminibuf = 1;
806 	}else{
807 		/* otherwise use nsection = sqrt(nmini) */
808 		for(nbuf=1; nbuf*nbuf<prod; nbuf++)
809 			;
810 		if(nbuf*MinBufSize > isectmem)
811 			sysfatal("not enough memory");
812 		nminibuf = nbuf;
813 	}
814 	if (nbuf == 0) {
815 		fprint(2, "%s: brand-new index, no work to do\n", argv0);
816 		threadexitsall(0);
817 	}
818 
819 	/* size buffer to use extra memory */
820 	bufsize = MinBufSize;
821 	while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
822 		bufsize *= 2;
823 	data = emalloc(nbuf*bufsize);
824 	epbuf = bufsize/IEntrySize;
825 	fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
826 		is->part->name, nbucket, nbuf, nminibuf, bufsize);
827 	/*
828 	 * Accept index entries from arena procs.
829 	 */
830 	buf = MKNZ(Buf, nbuf);
831 	p = data;
832 	offset = is->blockbase;
833 	bufbuckets = (nbucket+nbuf-1)/nbuf;
834 	for(i=0; i<nbuf; i++){
835 		buf[i].part = is->part;
836 		buf[i].bp = p;
837 		buf[i].wp = p;
838 		p += bufsize;
839 		buf[i].ep = p;
840 		buf[i].boffset = offset;
841 		buf[i].woffset = offset;
842 		if(i < nbuf-1){
843 			offset += bufbuckets*blocksize;
844 			buf[i].eoffset = offset;
845 		}else{
846 			offset = is->blockbase + nbucket*blocksize;
847 			buf[i].eoffset = offset;
848 		}
849 	}
850 	assert(p == data+nbuf*bufsize);
851 
852 	n = 0;
853 	while(recv(is->writechan, &ie) == 1){
854 		if(ie.ia.addr == 0)
855 			break;
856 		buck = score2bucket(is, ie.score);
857 		i = buck/bufbuckets;
858 		assert(i < nbuf);
859 		bwrite(&buf[i], &ie);
860 		n++;
861 	}
862 	add(&indexentries, n);
863 
864 	nn = 0;
865 	for(i=0; i<nbuf; i++){
866 		bflush(&buf[i]);
867 		buf[i].bp = nil;
868 		buf[i].ep = nil;
869 		buf[i].wp = nil;
870 		nn += buf[i].nentry;
871 	}
872 	if(n != nn)
873 		fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
874 
875 	free(data);
876 
877 	fprint(2, "%T %s: reordering\n", is->part->name);
878 
879 	/*
880 	 * Rearrange entries into minibuffers and then
881 	 * split each minibuffer into buckets.
882 	 * The minibuffer must be sized so that it is
883 	 * a multiple of blocksize -- ipoolloadblock assumes
884 	 * that each minibuf starts aligned on a blocksize
885 	 * boundary.
886 	 */
887 	mbuf = MKN(Minibuf, nminibuf);
888 	mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
889 	while(mbufbuckets*blocksize % bufsize)
890 		mbufbuckets++;
891 	for(i=0; i<nbuf; i++){
892 		/*
893 		 * Set up descriptors.
894 		 */
895 		n = buf[i].nentry;
896 		nn = 0;
897 		offset = buf[i].boffset;
898 		memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
899 		for(j=0; j<nminibuf; j++){
900 			mb = &mbuf[j];
901 			mb->boffset = offset;
902 			offset += mbufbuckets*blocksize;
903 			if(offset > buf[i].eoffset)
904 				offset = buf[i].eoffset;
905 			mb->eoffset = offset;
906 			mb->roffset = mb->boffset;
907 			mb->woffset = mb->boffset;
908 			mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
909 			if(mb->nentry > buf[i].nentry)
910 				mb->nentry = buf[i].nentry;
911 			buf[i].nentry -= mb->nentry;
912 			nn += mb->nentry;
913 		}
914 		if(n != nn)
915 			fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
916 		/*
917 		 * Rearrange.
918 		 */
919 		if(!dumb && nminibuf == 1){
920 			mbuf[0].nwentry = mbuf[0].nentry;
921 			mbuf[0].woffset = buf[i].woffset;
922 		}else{
923 			ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
924 			ipool->buck0 = bufbuckets*i;
925 			for(j=0; j<nminibuf; j++){
926 				mb = &mbuf[j];
927 				while(mb->nentry > 0){
928 					if(ipool->nfree < epbuf){
929 						ipoolflush1(ipool);
930 						/* ipoolflush1 might change mb->nentry */
931 						continue;
932 					}
933 					assert(ipool->nfree >= epbuf);
934 					ipoolloadblock(ipool, mb);
935 				}
936 			}
937 			ipoolflush(ipool);
938 			nn = 0;
939 			for(j=0; j<nminibuf; j++)
940 				nn += mbuf[j].nwentry;
941 			if(n != nn)
942 				fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
943 			free(ipool);
944 		}
945 
946 		/*
947 		 * Make buckets.
948 		 */
949 		space = 0;
950 		for(j=0; j<nminibuf; j++)
951 			if(space < mbuf[j].woffset - mbuf[j].boffset)
952 				space = mbuf[j].woffset - mbuf[j].boffset;
953 
954 		data = emalloc(space);
955 		for(j=0; j<nminibuf; j++){
956 			mb = &mbuf[j];
957 			sortminibuffer(is, mb, data, space, bufsize);
958 		}
959 		free(data);
960 	}
961 
962 	sendp(isectdonechan, is);
963 }
964 
965 
966 
967