xref: /csrg-svn/lib/libc/db/btree/bt_split.c (revision 50989)
146134Smao /*-
246134Smao  * Copyright (c) 1990 The Regents of the University of California.
346134Smao  * All rights reserved.
446134Smao  *
546134Smao  * This code is derived from software contributed to Berkeley by
646134Smao  * Mike Olson.
746134Smao  *
846134Smao  * %sccs.include.redist.c%
946134Smao  */
1046134Smao 
1146134Smao #if defined(LIBC_SCCS) && !defined(lint)
12*50989Sbostic static char sccsid[] = "@(#)bt_split.c	5.3 (Berkeley) 09/04/91";
1346134Smao #endif /* LIBC_SCCS and not lint */
1446134Smao 
1546134Smao #include <sys/types.h>
16*50989Sbostic #define	__DBINTERFACE_PRIVATE
1746134Smao #include <db.h>
18*50989Sbostic #include <limits.h>
19*50989Sbostic #include <stdio.h>
2046561Sbostic #include <stdlib.h>
2146561Sbostic #include <string.h>
2246134Smao #include "btree.h"
2346134Smao 
24*50989Sbostic static int	 bt_preserve __P((BTREE *, pgno_t));
25*50989Sbostic static PAGE	*bt_psplit __P((BTREE *, PAGE *, PAGE *, PAGE *, int *));
26*50989Sbostic static PAGE	*bt_page __P((BTREE *, PAGE *, PAGE **, PAGE **, int *));
27*50989Sbostic static PAGE	*bt_root __P((BTREE *, PAGE *, PAGE **, PAGE **, int *));
28*50989Sbostic static int	 bt_rroot __P((BTREE *, PAGE *, PAGE *, PAGE *));
29*50989Sbostic static int	 bt_broot __P((BTREE *, PAGE *, PAGE *, PAGE *));
30*50989Sbostic static recno_t	 rec_total __P((PAGE *));
31*50989Sbostic 
32*50989Sbostic #ifdef STATISTICS
33*50989Sbostic u_long	bt_rootsplit, bt_split, bt_sortsplit, bt_pfxsaved;
34*50989Sbostic #endif
35*50989Sbostic 
3646134Smao /*
37*50989Sbostic  * __BT_SPLIT -- Split the tree.
3846134Smao  *
39*50989Sbostic  * Parameters:
40*50989Sbostic  *	t:	tree
41*50989Sbostic  *	h:	page to split
42*50989Sbostic  *	key:	key to insert
43*50989Sbostic  *	data:	data to insert
44*50989Sbostic  *	flags:	BIGKEY/BIGDATA flags
45*50989Sbostic  *	nbytes:	length of insertion
46*50989Sbostic  *	skip:	index to leave open
4746134Smao  *
48*50989Sbostic  * Returns:
49*50989Sbostic  *	RET_ERROR, RET_SUCCESS
5046134Smao  */
5146134Smao int
52*50989Sbostic __bt_split(t, h, key, data, flags, nbytes, skip)
53*50989Sbostic 	BTREE *t;
54*50989Sbostic 	PAGE *h;
55*50989Sbostic 	const DBT *key, *data;
56*50989Sbostic 	u_long flags;
57*50989Sbostic 	size_t nbytes;
58*50989Sbostic 	int skip;
5946134Smao {
60*50989Sbostic 	BINTERNAL *bi;
61*50989Sbostic 	BLEAF *bl;
62*50989Sbostic 	DBT a, b;
63*50989Sbostic 	EPGNO *parent;
64*50989Sbostic 	PAGE *l, *r, *lchild, *rchild;
65*50989Sbostic 	index_t nxtindex;
66*50989Sbostic 	size_t nksize;
67*50989Sbostic 	int nosplit;
68*50989Sbostic 	char *dest;
6946134Smao 
7046134Smao 	/*
71*50989Sbostic 	 * Split the page into two pages, l and r.  The split routines return
72*50989Sbostic 	 * a pointer to the page into which the key should be inserted and skip
73*50989Sbostic 	 * set to the offset which should be used.  Additionally, l and r are
74*50989Sbostic 	 * pinned.
7546134Smao 	 */
76*50989Sbostic 	h = h->pgno == P_ROOT ?
77*50989Sbostic 	    bt_root(t, h, &l, &r, &skip) : bt_page(t, h, &l, &r, &skip);
78*50989Sbostic 	if (h == NULL)
7946134Smao 		return (RET_ERROR);
8046134Smao 
81*50989Sbostic 	/*
82*50989Sbostic 	 * Grab the space and insert the [rb]leaf structure.  Always a [rb]leaf
83*50989Sbostic 	 * structure since key inserts always cause a leaf page to split first.
84*50989Sbostic 	 */
85*50989Sbostic 	h->linp[skip] = h->upper -= nbytes;
86*50989Sbostic 	dest = (char *)h + h->upper;
87*50989Sbostic 	if (ISSET(t, BTF_RECNO)) {
88*50989Sbostic 		WR_RLEAF(dest, data, flags)
89*50989Sbostic 		++t->bt_nrecs;
90*50989Sbostic 		SET(t, BTF_METADIRTY | BTF_MODIFIED);
9146134Smao 	} else {
92*50989Sbostic 		WR_BLEAF(dest, key, data, flags)
93*50989Sbostic 		SET(t, BTF_MODIFIED);
9446134Smao 	}
9546134Smao 
9646134Smao 	/*
97*50989Sbostic 	 * Now we walk the parent page stack -- a LIFO stack of the pages that
98*50989Sbostic 	 * were traversed when we searched for the page that split.  Each stack
99*50989Sbostic 	 * entry is a page number and a page index offset.  The offset is for
100*50989Sbostic 	 * the page traversed on the search.  We've just split a page, so we
101*50989Sbostic 	 * have to insert a new key into the parent page.
102*50989Sbostic 	 *
103*50989Sbostic 	 * If the insert into the parent page causes it to split, may have to
104*50989Sbostic 	 * continue splitting all the way up the tree.  We stop if the root
105*50989Sbostic 	 * splits or the page inserted into didn't have to split to hold the
106*50989Sbostic 	 * new key.  Some algorithms replace the key for the old page as well
107*50989Sbostic 	 * as the new page.  We don't, as there's no reason to believe that the
108*50989Sbostic 	 * first key on the old page is any better than the key we have, and,
109*50989Sbostic 	 * in the case of a key being placed at index 0 causing the split, the
110*50989Sbostic 	 * key is unavailable.
111*50989Sbostic 	 *
112*50989Sbostic 	 * There are a maximum of 5 pages pinned at any time.  We keep the left
113*50989Sbostic 	 * and right pages pinned while working on the parent.   The 5 are the
114*50989Sbostic 	 * two children, left parent and right parent (when the parent splits)
115*50989Sbostic 	 * and the root page or the overflow key page when calling bt_preserve.
116*50989Sbostic 	 * This code must make sure that all pins are released other than the
117*50989Sbostic 	 * root page or overflow page which is unlocked elsewhere.
11846134Smao 	 */
119*50989Sbostic 	for (nosplit = 0; (parent = BT_POP(t)) != NULL;) {
120*50989Sbostic 		lchild = l;
121*50989Sbostic 		rchild = r;
12246134Smao 
123*50989Sbostic 		/* Get the parent page. */
124*50989Sbostic 		if ((h = mpool_get(t->bt_mp, parent->pgno, 0)) == NULL)
125*50989Sbostic 			goto err2;
12646134Smao 
127*50989Sbostic 	 	/* The new key goes ONE AFTER the index. */
128*50989Sbostic 		skip = parent->index + 1;
12946134Smao 
130*50989Sbostic 		/*
131*50989Sbostic 		 * Calculate the space needed on the parent page.
132*50989Sbostic 		 *
133*50989Sbostic 		 * Space hack when insertin into BINTERNAL pages.  Only need to
134*50989Sbostic 		 * retain the number of bytes that will distinguish between the
135*50989Sbostic 		 * new entry and the LAST entry on the page to its left.  If
136*50989Sbostic 		 * the keys compare equal, only need to retain one byte as a
137*50989Sbostic 		 * placeholder.  Special cases are that the entire key must be
138*50989Sbostic 		 * retained for the next-to-leftmost key on the leftmost page
139*50989Sbostic 		 * of each level, or the search will fail, and can't mess with
140*50989Sbostic 		 * overflow keys.
141*50989Sbostic 		 */
142*50989Sbostic 		switch (rchild->flags & P_TYPE) {
143*50989Sbostic 		case P_BINTERNAL:
144*50989Sbostic 			bi = GETBINTERNAL(rchild, 0);
145*50989Sbostic 			nbytes = NBINTERNAL(bi->ksize);
146*50989Sbostic 			if (t->bt_pfx && (h->prevpg != P_INVALID || skip > 1) &&
147*50989Sbostic 			    !(bi->flags & P_BIGKEY)) {
148*50989Sbostic 				BINTERNAL *tbi;
149*50989Sbostic 				tbi =
150*50989Sbostic 				    GETBINTERNAL(lchild, NEXTINDEX(lchild) - 1);
151*50989Sbostic 				a.size = tbi->ksize;
152*50989Sbostic 				a.data = tbi->bytes;
153*50989Sbostic 				b.size = bi->ksize;
154*50989Sbostic 				b.data = bi->bytes;
155*50989Sbostic 				goto prefix;
15646134Smao 			}
157*50989Sbostic 			break;
158*50989Sbostic 		case P_BLEAF:
159*50989Sbostic 			bl = GETBLEAF(rchild, 0);
160*50989Sbostic 			nbytes = NBLEAF(bl);
161*50989Sbostic 			if (t->bt_pfx && (h->prevpg != P_INVALID || skip > 1) &&
162*50989Sbostic 			    !(bl->flags & P_BIGKEY)) {
163*50989Sbostic 				BLEAF *tbl;
164*50989Sbostic 				size_t n;
165*50989Sbostic 
166*50989Sbostic 				tbl = GETBLEAF(lchild, NEXTINDEX(lchild) - 1);
167*50989Sbostic 				a.size = tbl->ksize;
168*50989Sbostic 				a.data = tbl->bytes;
169*50989Sbostic 				b.size = bl->ksize;
170*50989Sbostic 				b.data = bl->bytes;
171*50989Sbostic prefix:				nksize = t->bt_pfx(&a, &b);
172*50989Sbostic 				n = NBINTERNAL(nksize);
173*50989Sbostic 				if (n < nbytes) {
174*50989Sbostic #ifdef STATISTICS
175*50989Sbostic 					bt_pfxsaved += nbytes - n;
176*50989Sbostic #endif
177*50989Sbostic 					nbytes = n;
178*50989Sbostic 				} else
179*50989Sbostic 					nksize = 0;
180*50989Sbostic 			} else
181*50989Sbostic 				nksize = 0;
182*50989Sbostic 			break;
183*50989Sbostic 		case P_RINTERNAL:
184*50989Sbostic 		case P_RLEAF:
185*50989Sbostic 			nbytes = NRINTERNAL;
186*50989Sbostic 			break;
187*50989Sbostic 		}
188*50989Sbostic 
189*50989Sbostic 		/* Split the parent page if necessary or shift the indices. */
190*50989Sbostic 		if (h->upper - h->lower < nbytes + sizeof(index_t)) {
191*50989Sbostic 			h = h->pgno == P_ROOT ?
192*50989Sbostic 			    bt_root(t, h, &l, &r, &skip) :
193*50989Sbostic 			    bt_page(t, h, &l, &r, &skip);
194*50989Sbostic 			if (h == NULL)
195*50989Sbostic 				goto err1;
19646134Smao 		} else {
197*50989Sbostic 			if (skip < (nxtindex = NEXTINDEX(h)))
198*50989Sbostic 				bcopy(h->linp + skip, h->linp + skip + 1,
199*50989Sbostic 				    (nxtindex - skip) * sizeof(index_t));
200*50989Sbostic 			h->lower += sizeof(index_t);
201*50989Sbostic 			nosplit = 1;
20246134Smao 		}
20346134Smao 
204*50989Sbostic 		/* Insert the key into the parent page. */
205*50989Sbostic 		switch(rchild->flags & P_TYPE) {
206*50989Sbostic 		case P_BINTERNAL:
207*50989Sbostic 			h->linp[skip] = h->upper -= nbytes;
208*50989Sbostic 			dest = (char *)h + h->linp[skip];
209*50989Sbostic 			bcopy(bi, dest, nbytes);
210*50989Sbostic 			if (nksize)
211*50989Sbostic 				((BINTERNAL *)dest)->ksize = nksize;
212*50989Sbostic 			((BINTERNAL *)dest)->pgno = rchild->pgno;
213*50989Sbostic 			break;
214*50989Sbostic 		case P_BLEAF:
215*50989Sbostic 			h->linp[skip] = h->upper -= nbytes;
216*50989Sbostic 			dest = (char *)h + h->linp[skip];
217*50989Sbostic 			WR_BINTERNAL(dest, nksize ? nksize : bl->ksize,
218*50989Sbostic 			    rchild->pgno, rchild->flags & P_OVERFLOW);
219*50989Sbostic 			bcopy(bl->bytes, dest, nksize ? nksize : bl->ksize);
220*50989Sbostic 			if (bl->flags & P_BIGKEY &&
221*50989Sbostic 			    bt_preserve(t, *(pgno_t *)bl->bytes) == RET_ERROR)
222*50989Sbostic 				goto err1;
223*50989Sbostic 			break;
224*50989Sbostic 		case P_RINTERNAL:
225*50989Sbostic 			/* Update both left and right page counts. */
226*50989Sbostic 			h->linp[skip] = h->upper -= nbytes;
227*50989Sbostic 			dest = (char *)h + h->linp[skip];
228*50989Sbostic 			((RINTERNAL *)dest)->nrecs = rec_total(rchild);
229*50989Sbostic 			((RINTERNAL *)dest)->pgno = rchild->pgno;
230*50989Sbostic 			dest = (char *)h + h->linp[skip - 1];
231*50989Sbostic 			((RINTERNAL *)dest)->nrecs = rec_total(lchild);
232*50989Sbostic 			((RINTERNAL *)dest)->pgno = lchild->pgno;
233*50989Sbostic 			break;
234*50989Sbostic 		case P_RLEAF:
235*50989Sbostic 			/* Update both left and right page counts. */
236*50989Sbostic 			h->linp[skip] = h->upper -= nbytes;
237*50989Sbostic 			dest = (char *)h + h->linp[skip];
238*50989Sbostic 			((RINTERNAL *)dest)->nrecs = NEXTINDEX(rchild);
239*50989Sbostic 			((RINTERNAL *)dest)->pgno = rchild->pgno;
240*50989Sbostic 			dest = (char *)h + h->linp[skip - 1];
241*50989Sbostic 			((RINTERNAL *)dest)->nrecs = NEXTINDEX(lchild);
242*50989Sbostic 			((RINTERNAL *)dest)->pgno = lchild->pgno;
243*50989Sbostic 			break;
244*50989Sbostic 		}
24546134Smao 
246*50989Sbostic 		/* Unpin the held pages. */
247*50989Sbostic 		if (nosplit) {
248*50989Sbostic 			mpool_put(t->bt_mp, h, MPOOL_DIRTY);
249*50989Sbostic 			break;
250*50989Sbostic 		}
251*50989Sbostic 		mpool_put(t->bt_mp, lchild, MPOOL_DIRTY);
252*50989Sbostic 		mpool_put(t->bt_mp, rchild, MPOOL_DIRTY);
253*50989Sbostic 	}
25446134Smao 
255*50989Sbostic 	/* Unpin the held pages. */
256*50989Sbostic 	mpool_put(t->bt_mp, l, MPOOL_DIRTY);
257*50989Sbostic 	mpool_put(t->bt_mp, r, MPOOL_DIRTY);
258*50989Sbostic 
259*50989Sbostic 	/*
260*50989Sbostic 	 * If it's a recno tree, increment the count on all remaining parent
261*50989Sbostic 	 * pages.  Otherwise, clear the stack.
262*50989Sbostic 	 */
263*50989Sbostic 	if (ISSET(t, BTF_RECNO))
264*50989Sbostic 		while  ((parent = BT_POP(t)) != NULL) {
265*50989Sbostic 			if ((h = mpool_get(t->bt_mp, parent->pgno, 0)) == NULL)
26646134Smao 				return (RET_ERROR);
267*50989Sbostic 			++GETRINTERNAL(h, parent->index)->nrecs;
268*50989Sbostic 			mpool_put(t->bt_mp, h, MPOOL_DIRTY);
26946134Smao 		}
270*50989Sbostic 	else
271*50989Sbostic 		BT_CLR(t);
272*50989Sbostic 	return (RET_SUCCESS);
27346134Smao 
27446134Smao 	/*
275*50989Sbostic 	 * If something fails in the above loop we were already walking back
276*50989Sbostic 	 * up the tree and the tree is now inconsistent.  Nothing much we can
277*50989Sbostic 	 * do about it but release any memory we're holding.
27846134Smao 	 */
279*50989Sbostic err1:	mpool_put(t->bt_mp, lchild, MPOOL_DIRTY);
280*50989Sbostic 	mpool_put(t->bt_mp, rchild, MPOOL_DIRTY);
28146134Smao 
282*50989Sbostic err2:	mpool_put(t->bt_mp, l, 0);
283*50989Sbostic 	mpool_put(t->bt_mp, r, 0);
284*50989Sbostic 	__dbpanic(t->bt_dbp);
285*50989Sbostic 	return (RET_ERROR);
28646134Smao }
28746134Smao 
28846134Smao /*
289*50989Sbostic  * BT_PAGE -- Split a non-root page of a btree.
29046134Smao  *
291*50989Sbostic  * Parameters:
292*50989Sbostic  *	t:	tree
293*50989Sbostic  *	h:	root page
294*50989Sbostic  *	lp:	pointer to left page pointer
295*50989Sbostic  *	rp:	pointer to right page pointer
296*50989Sbostic  *	skip:	pointer to index to leave open
29746134Smao  *
298*50989Sbostic  * Returns:
299*50989Sbostic  *	Pointer to page in which to insert or NULL on error.
30046134Smao  */
301*50989Sbostic static PAGE *
302*50989Sbostic bt_page(t, h, lp, rp, skip)
303*50989Sbostic 	BTREE *t;
304*50989Sbostic 	PAGE *h, **lp, **rp;
305*50989Sbostic 	int *skip;
30646134Smao {
307*50989Sbostic 	PAGE *l, *r, *tp;
308*50989Sbostic 	pgno_t npg;
30946134Smao 
310*50989Sbostic #ifdef STATISTICS
311*50989Sbostic 	++bt_split;
312*50989Sbostic #endif
313*50989Sbostic 	/* Put the new right page for the split into place. */
314*50989Sbostic 	if ((r = mpool_new(t->bt_mp, &npg)) == NULL)
315*50989Sbostic 		return (NULL);
316*50989Sbostic 	r->pgno = npg;
317*50989Sbostic 	r->lower = BTDATAOFF;
318*50989Sbostic 	r->upper = t->bt_psize;
319*50989Sbostic 	r->nextpg = h->nextpg;
320*50989Sbostic 	r->prevpg = h->pgno;
321*50989Sbostic 	r->flags = h->flags & P_TYPE;
32246134Smao 
323*50989Sbostic 	/*
324*50989Sbostic 	 * If we're splitting the last page on a level because we're appending
325*50989Sbostic 	 * a key to it (skip is NEXTINDEX()), it's likely that the data is
326*50989Sbostic 	 * sorted.  Adding an empty page on the side of the level is less work
327*50989Sbostic 	 * and can push the fill factor much higher than normal.  If we're
328*50989Sbostic 	 * wrong it's no big deal, we'll just do the split the right way next
329*50989Sbostic 	 * time.  It may look like it's equally easy to do a similar hack for
330*50989Sbostic 	 * reverse sorted data, that is, split the tree left, but it's not.
331*50989Sbostic 	 * Don't even try.
332*50989Sbostic 	 */
333*50989Sbostic 	if (h->nextpg == P_INVALID && *skip == NEXTINDEX(h)) {
334*50989Sbostic #ifdef STATISTICS
335*50989Sbostic 		++bt_sortsplit;
336*50989Sbostic #endif
337*50989Sbostic 		h->nextpg = r->pgno;
338*50989Sbostic 		r->lower = BTDATAOFF + sizeof(index_t);
339*50989Sbostic 		*skip = 0;
340*50989Sbostic 		*lp = h;
341*50989Sbostic 		*rp = r;
342*50989Sbostic 		return (r);
343*50989Sbostic 	}
34446134Smao 
345*50989Sbostic 	/* Put the new left page for the split into place. */
346*50989Sbostic 	if ((l = malloc(t->bt_psize)) == NULL) {
347*50989Sbostic 		mpool_put(t->bt_mp, r, 0);
348*50989Sbostic 		return (NULL);
349*50989Sbostic 	}
350*50989Sbostic 	l->pgno = h->pgno;
351*50989Sbostic 	l->nextpg = r->pgno;
352*50989Sbostic 	l->prevpg = h->prevpg;
353*50989Sbostic 	l->lower = BTDATAOFF;
354*50989Sbostic 	l->upper = t->bt_psize;
355*50989Sbostic 	l->flags = h->flags & P_TYPE;
35646134Smao 
357*50989Sbostic 	/* Fix up the previous pointer of the page after the split page. */
358*50989Sbostic 	if (h->nextpg != P_INVALID) {
359*50989Sbostic 		if ((tp = mpool_get(t->bt_mp, h->nextpg, 0)) == NULL) {
360*50989Sbostic 			free(l);
361*50989Sbostic 			/* XXX mpool_free(t->bt_mp, r->pgno); */
362*50989Sbostic 			return (NULL);
36346134Smao 		}
364*50989Sbostic 		tp->prevpg = r->pgno;
365*50989Sbostic 		mpool_put(t->bt_mp, tp, 0);
366*50989Sbostic 	}
36746134Smao 
368*50989Sbostic 	/*
369*50989Sbostic 	 * Split right.  The key/data pairs aren't sorted in the btree page so
370*50989Sbostic 	 * it's simpler to copy the data from the split page onto two new pages
371*50989Sbostic 	 * instead of copying half the data to the right page and compacting
372*50989Sbostic 	 * the left page in place.  Since the left page can't change, we have
373*50989Sbostic 	 * to swap the original and the allocated left page after the split.
374*50989Sbostic 	 */
375*50989Sbostic 	tp = bt_psplit(t, h, l, r, skip);
37646134Smao 
377*50989Sbostic 	/* Move the new left page onto the old left page. */
378*50989Sbostic 	bcopy(l, h, t->bt_psize);
379*50989Sbostic 	if (tp == l)
380*50989Sbostic 		tp = h;
381*50989Sbostic 	free(l);
382*50989Sbostic 
383*50989Sbostic 	*lp = h;
384*50989Sbostic 	*rp = r;
385*50989Sbostic 	return (tp);
38646134Smao }
38746134Smao 
38846134Smao /*
389*50989Sbostic  * BT_RSPLIT -- Split the root page of a btree.
39046134Smao  *
391*50989Sbostic  * Parameters:
392*50989Sbostic  *	t:	tree
393*50989Sbostic  *	h:	root page
394*50989Sbostic  *	lp:	pointer to left page pointer
395*50989Sbostic  *	rp:	pointer to right page pointer
396*50989Sbostic  *	skip:	pointer to index to leave open
39746134Smao  *
398*50989Sbostic  * Returns:
399*50989Sbostic  *	Pointer to page in which to insert or NULL on error.
40046134Smao  */
401*50989Sbostic static PAGE *
402*50989Sbostic bt_root(t, h, lp, rp, skip)
403*50989Sbostic 	BTREE *t;
404*50989Sbostic 	PAGE *h, **lp, **rp;
405*50989Sbostic 	int *skip;
40646134Smao {
407*50989Sbostic 	PAGE *l, *r, *tp;
408*50989Sbostic 	pgno_t lnpg, rnpg;
40946134Smao 
410*50989Sbostic #ifdef STATISTICS
411*50989Sbostic 	++bt_split;
412*50989Sbostic 	++bt_rootsplit;
413*50989Sbostic #endif
414*50989Sbostic 	/* Put the new left and right pages for the split into place. */
415*50989Sbostic 	if ((l = mpool_new(t->bt_mp, &lnpg)) == NULL ||
416*50989Sbostic 	    (r = mpool_new(t->bt_mp, &rnpg)) == NULL)
417*50989Sbostic 		return (NULL);
418*50989Sbostic 	l->pgno = lnpg;
419*50989Sbostic 	r->pgno = rnpg;
420*50989Sbostic 	l->nextpg = r->pgno;
421*50989Sbostic 	r->prevpg = l->pgno;
422*50989Sbostic 	l->prevpg = r->nextpg = P_INVALID;
423*50989Sbostic 	l->lower = r->lower = BTDATAOFF;
424*50989Sbostic 	l->upper = r->upper = t->bt_psize;
425*50989Sbostic 	l->flags = r->flags = h->flags & P_TYPE;
426*50989Sbostic 
427*50989Sbostic 	/* Split the root page. */
428*50989Sbostic 	tp = bt_psplit(t, h, l, r, skip);
429*50989Sbostic 
430*50989Sbostic 	/* Make the root page look right. */
431*50989Sbostic 	if ((ISSET(t, BTF_RECNO) ?
432*50989Sbostic 	    bt_rroot(t, h, l, r) : bt_broot(t, h, l, r)) == RET_ERROR)
433*50989Sbostic 		return (NULL);
434*50989Sbostic 
435*50989Sbostic 	*lp = l;
436*50989Sbostic 	*rp = r;
437*50989Sbostic 	return (tp);
43846134Smao }
43946134Smao 
44046134Smao /*
441*50989Sbostic  * BT_RROOT -- Fix up the recno root page after the split.
44246134Smao  *
443*50989Sbostic  * Parameters:
444*50989Sbostic  *	t:	tree
445*50989Sbostic  *	h:	root page
44646134Smao  *
447*50989Sbostic  * Returns:
448*50989Sbostic  *	RET_ERROR, RET_SUCCESS
44946134Smao  */
450*50989Sbostic static int
451*50989Sbostic bt_rroot(t, h, l, r)
452*50989Sbostic 	BTREE *t;
453*50989Sbostic 	PAGE *h, *l, *r;
45446134Smao {
455*50989Sbostic 	char *dest;
45646134Smao 
457*50989Sbostic 	/* Insert the left and right keys, set the header information. */
458*50989Sbostic 	h->linp[0] = h->upper = t->bt_psize - NRINTERNAL;
459*50989Sbostic 	dest = (char *)h + h->upper;
460*50989Sbostic 	WR_RINTERNAL(dest,
461*50989Sbostic 	    l->flags & P_RLEAF ? NEXTINDEX(l) : rec_total(l), l->pgno);
46246134Smao 
463*50989Sbostic 	h->linp[1] = h->upper -= NRINTERNAL;
464*50989Sbostic 	dest = (char *)h + h->upper;
465*50989Sbostic 	WR_RINTERNAL(dest,
466*50989Sbostic 	    r->flags & P_RLEAF ? NEXTINDEX(r) : rec_total(r), r->pgno);
46746134Smao 
468*50989Sbostic 	h->lower = BTDATAOFF + 2 * sizeof(index_t);
46946134Smao 
470*50989Sbostic 	/* Unpin the root page, set to recno internal page. */
471*50989Sbostic 	h->flags &= ~P_TYPE;
472*50989Sbostic 	h->flags |= P_RINTERNAL;
473*50989Sbostic 	mpool_put(t->bt_mp, h, MPOOL_DIRTY);
47446134Smao 
475*50989Sbostic 	return (RET_SUCCESS);
476*50989Sbostic }
47746134Smao 
478*50989Sbostic /*
479*50989Sbostic  * BT_BROOT -- Fix up the btree root page after the split.
480*50989Sbostic  *
481*50989Sbostic  * Parameters:
482*50989Sbostic  *	t:	tree
483*50989Sbostic  *	h:	root page
484*50989Sbostic  *
485*50989Sbostic  * Returns:
486*50989Sbostic  *	RET_ERROR, RET_SUCCESS
487*50989Sbostic  */
488*50989Sbostic static int
489*50989Sbostic bt_broot(t, h, l, r)
490*50989Sbostic 	BTREE *t;
491*50989Sbostic 	PAGE *h, *l, *r;
492*50989Sbostic {
493*50989Sbostic 	BINTERNAL *bi;
494*50989Sbostic 	BLEAF *bl;
495*50989Sbostic 	size_t nbytes;
496*50989Sbostic 	char *dest;
49746134Smao 
49846134Smao 	/*
499*50989Sbostic 	 * If the root page was a leaf page, change it into an internal page.
500*50989Sbostic 	 * We copy the key we split on (but not the key's data, in the case of
501*50989Sbostic 	 * a leaf page) to the new root page.  If the key is on an overflow
502*50989Sbostic 	 * page, mark the overflow chain so it isn't deleted when the leaf copy
503*50989Sbostic 	 * of the key is deleted.
504*50989Sbostic 	 *
505*50989Sbostic 	 * The btree comparison code guarantees that the left-most key on any
506*50989Sbostic 	 * level of the tree is never used, so it doesn't need to be filled
507*50989Sbostic 	 * in.  (This is not just convenience -- if the insert index is 0, we
508*50989Sbostic 	 * don't *have* a key to fill in.)  The right key is available because
509*50989Sbostic 	 * the split code guarantees not to split on the skipped index.
51046134Smao 	 */
511*50989Sbostic 	nbytes = LALIGN(sizeof(size_t) + sizeof(pgno_t) + sizeof(u_char));
512*50989Sbostic 	h->linp[0] = h->upper = t->bt_psize - nbytes;
513*50989Sbostic 	dest = (char *)h + h->upper;
514*50989Sbostic 	WR_BINTERNAL(dest, 0, l->pgno, 0);
51546134Smao 
516*50989Sbostic 	switch(h->flags & P_TYPE) {
517*50989Sbostic 	case P_BLEAF:
518*50989Sbostic 		bl = GETBLEAF(r, 0);
519*50989Sbostic 		nbytes = NBINTERNAL(bl->ksize);
520*50989Sbostic 		h->linp[1] = h->upper -= nbytes;
521*50989Sbostic 		dest = (char *)h + h->upper;
522*50989Sbostic 		WR_BINTERNAL(dest, bl->ksize, r->pgno, 0);
523*50989Sbostic 		bcopy(bl->bytes, dest, bl->ksize);
524*50989Sbostic 
525*50989Sbostic 		if (bl->flags & P_BIGKEY &&
526*50989Sbostic 		    bt_preserve(t, *(pgno_t *)bl->bytes) == RET_ERROR)
527*50989Sbostic 			return (RET_ERROR);
528*50989Sbostic 		break;
529*50989Sbostic 	case P_BINTERNAL:
530*50989Sbostic 		bi = GETBINTERNAL(r, 0);
531*50989Sbostic 		nbytes = NBINTERNAL(bi->ksize);
532*50989Sbostic 		h->linp[1] = h->upper -= nbytes;
533*50989Sbostic 		dest = (char *)h + h->upper;
534*50989Sbostic 		bcopy(bi, dest, nbytes);
535*50989Sbostic 		((BINTERNAL *)dest)->pgno = r->pgno;
536*50989Sbostic 		break;
53746134Smao 	}
538*50989Sbostic 	h->lower = BTDATAOFF + 2 * sizeof(index_t);
53946134Smao 
540*50989Sbostic 	/* Unpin the root page, set to btree internal page. */
541*50989Sbostic 	h->flags &= ~P_TYPE;
542*50989Sbostic 	h->flags |= P_BINTERNAL;
543*50989Sbostic 	mpool_put(t->bt_mp, h, MPOOL_DIRTY);
54446134Smao 
54546134Smao 	return (RET_SUCCESS);
54646134Smao }
54746134Smao 
54846134Smao /*
549*50989Sbostic  * BT_PSPLIT -- Do the real work of splitting the page.
55046134Smao  *
551*50989Sbostic  * Parameters:
552*50989Sbostic  *	t:	tree
553*50989Sbostic  *	h:	page to be split
554*50989Sbostic  *	l:	page to put lower half of data
555*50989Sbostic  *	r:	page to put upper half of data
556*50989Sbostic  *	skip:	pointer to index to leave open
55746134Smao  *
558*50989Sbostic  * Returns:
559*50989Sbostic  *	Pointer to page in which to insert.
56046134Smao  */
561*50989Sbostic static PAGE *
562*50989Sbostic bt_psplit(t, h, l, r, skip)
563*50989Sbostic 	BTREE *t;
564*50989Sbostic 	PAGE *h, *l, *r;
565*50989Sbostic 	int *skip;
56646134Smao {
567*50989Sbostic 	BINTERNAL *bi;
568*50989Sbostic 	BLEAF *bl;
569*50989Sbostic 	RLEAF *rl;
570*50989Sbostic 	EPGNO *c;
571*50989Sbostic 	PAGE *rval;
572*50989Sbostic 	index_t half, sval;
573*50989Sbostic 	size_t nbytes;
574*50989Sbostic 	void *src;
575*50989Sbostic 	int bigkeycnt, isbigkey, nxt, off, top;
57646134Smao 
57746134Smao 	/*
578*50989Sbostic 	 * Split the data to the left and right pages. Leave the skip index
579*50989Sbostic 	 * open and guarantee that the split doesn't happen on that index (the
580*50989Sbostic 	 * right key must be available for the parent page).  Additionally,
581*50989Sbostic 	 * make some effort not to split on an overflow key.  This makes it
582*50989Sbostic 	 * faster to process internal pages and can save space since overflow
583*50989Sbostic 	 * keys used by internal pages are never deleted.
58446134Smao 	 */
585*50989Sbostic 	bigkeycnt = 0;
586*50989Sbostic 	sval = *skip;
587*50989Sbostic 	half = (t->bt_psize - BTDATAOFF) / 2;
588*50989Sbostic 	for (nxt = off = 0, top = NEXTINDEX(h); nxt < top; ++off) {
589*50989Sbostic 		if (sval == off)
590*50989Sbostic 			continue;
591*50989Sbostic 		switch (h->flags & P_TYPE) {
592*50989Sbostic 		case P_BINTERNAL:
593*50989Sbostic 			src = bi = GETBINTERNAL(h, nxt);
594*50989Sbostic 			nbytes = NBINTERNAL(bi->ksize);
595*50989Sbostic 			isbigkey = bi->flags & P_BIGKEY;
596*50989Sbostic 			break;
597*50989Sbostic 		case P_BLEAF:
598*50989Sbostic 			src = bl = GETBLEAF(h, nxt);
599*50989Sbostic 			nbytes = NBLEAF(bl);
600*50989Sbostic 			isbigkey = bl->flags & P_BIGKEY;
601*50989Sbostic 			break;
602*50989Sbostic 		case P_RINTERNAL:
603*50989Sbostic 			src = GETRINTERNAL(h, nxt);
604*50989Sbostic 			nbytes = NRINTERNAL;
605*50989Sbostic 			isbigkey = 0;
606*50989Sbostic 			break;
607*50989Sbostic 		case P_RLEAF:
608*50989Sbostic 			src = rl = GETRLEAF(h, nxt);
609*50989Sbostic 			nbytes = NRLEAF(rl);
610*50989Sbostic 			isbigkey = 0;
611*50989Sbostic 			break;
612*50989Sbostic 		}
613*50989Sbostic 		++nxt;
614*50989Sbostic 		l->linp[off] = l->upper -= nbytes;
615*50989Sbostic 		bcopy(src, (char *)l + l->upper, nbytes);
61646134Smao 
617*50989Sbostic 		/* There's no empirical justification for the '3'. */
618*50989Sbostic 		if (half < nbytes)
619*50989Sbostic 			if (!isbigkey || bigkeycnt == 3)
620*50989Sbostic 				break;
621*50989Sbostic 			else
622*50989Sbostic 				++bigkeycnt;
623*50989Sbostic 		else
624*50989Sbostic 			half -= nbytes;
625*50989Sbostic 	}
626*50989Sbostic 	l->lower += (off + 1) * sizeof(index_t);
62746134Smao 
628*50989Sbostic 	/*
629*50989Sbostic 	 * If we're splitting the page that the cursor was on, have to adjust
630*50989Sbostic 	 * the cursor to point to the same record as before the split.
631*50989Sbostic 	 */
632*50989Sbostic 	c = &t->bt_bcursor;
633*50989Sbostic 	if (c->pgno == h->pgno)
634*50989Sbostic 		if (c->index < off)
635*50989Sbostic 			c->pgno = l->pgno;
636*50989Sbostic 		else {
637*50989Sbostic 			c->pgno = r->pgno;
638*50989Sbostic 			c->index -= off;
63946134Smao 		}
64046134Smao 
641*50989Sbostic 	/*
642*50989Sbostic 	 * Decide which page to return, and adjust the skip index if the
643*50989Sbostic 	 * to-be-inserted-upon page has changed.
644*50989Sbostic 	 */
645*50989Sbostic 	if (sval > off) {
646*50989Sbostic 		rval = r;
647*50989Sbostic 		*skip -= off + 1;
648*50989Sbostic 	} else
649*50989Sbostic 		rval = l;
65046134Smao 
651*50989Sbostic 	for (off = 0; nxt < top; ++off) {
652*50989Sbostic 		if (sval == nxt) {
653*50989Sbostic 			sval = 0;
654*50989Sbostic 			continue;
65546134Smao 		}
656*50989Sbostic 		switch (h->flags & P_TYPE) {
657*50989Sbostic 		case P_BINTERNAL:
658*50989Sbostic 			src = bi = GETBINTERNAL(h, nxt);
659*50989Sbostic 			nbytes = NBINTERNAL(bi->ksize);
660*50989Sbostic 			break;
661*50989Sbostic 		case P_BLEAF:
662*50989Sbostic 			src = bl = GETBLEAF(h, nxt);
663*50989Sbostic 			nbytes = NBLEAF(bl);
664*50989Sbostic 			break;
665*50989Sbostic 		case P_RINTERNAL:
666*50989Sbostic 			src = GETRINTERNAL(h, nxt);
667*50989Sbostic 			nbytes = NRINTERNAL;
668*50989Sbostic 			break;
669*50989Sbostic 		case P_RLEAF:
670*50989Sbostic 			src = rl = GETRLEAF(h, nxt);
671*50989Sbostic 			nbytes = NRLEAF(rl);
672*50989Sbostic 			break;
673*50989Sbostic 		}
674*50989Sbostic 		++nxt;
675*50989Sbostic 		r->linp[off] = r->upper -= nbytes;
676*50989Sbostic 		bcopy(src, (char *)r + r->upper, nbytes);
677*50989Sbostic 	}
678*50989Sbostic 	r->lower += off * sizeof(index_t);
67946134Smao 
680*50989Sbostic 	/* If the key is being appended to the page, adjust the index. */
681*50989Sbostic 	if (sval == top)
682*50989Sbostic 		r->lower += sizeof(index_t);
68346134Smao 
684*50989Sbostic 	return (rval);
685*50989Sbostic }
68646134Smao 
687*50989Sbostic /*
688*50989Sbostic  * BT_PRESERVE -- Mark a chain of pages as used by an internal node.
689*50989Sbostic  *
690*50989Sbostic  * Chains of indirect blocks pointed to by leaf nodes get reclaimed when the
691*50989Sbostic  * record that references them gets deleted.  Chains pointed to by internal
692*50989Sbostic  * pages never get deleted.  This routine marks a chain as pointed to by an
693*50989Sbostic  * internal page.
694*50989Sbostic  *
695*50989Sbostic  * Parameters:
696*50989Sbostic  *	t:	tree
697*50989Sbostic  *	pg:	page number of first page in the chain.
698*50989Sbostic  *
699*50989Sbostic  * Returns:
700*50989Sbostic  *	RET_SUCCESS, RET_ERROR.
701*50989Sbostic  */
702*50989Sbostic static int
703*50989Sbostic bt_preserve(t, pg)
704*50989Sbostic 	BTREE *t;
705*50989Sbostic 	pgno_t pg;
706*50989Sbostic {
707*50989Sbostic 	PAGE *h;
70846134Smao 
709*50989Sbostic 	if ((h = mpool_get(t->bt_mp, pg, 0)) == NULL)
710*50989Sbostic 		return (RET_ERROR);
711*50989Sbostic 	h->flags |= P_PRESERVE;
712*50989Sbostic 	mpool_put(t->bt_mp, h, MPOOL_DIRTY);
71346134Smao 	return (RET_SUCCESS);
71446134Smao }
715*50989Sbostic 
716*50989Sbostic /*
717*50989Sbostic  * REC_TOTAL -- Return the number of recno entries below a page.
718*50989Sbostic  *
719*50989Sbostic  * Parameters:
720*50989Sbostic  *	h:	page
721*50989Sbostic  *
722*50989Sbostic  * Returns:
723*50989Sbostic  *	The number of recno entries below a page.
724*50989Sbostic  *
725*50989Sbostic  * XXX
726*50989Sbostic  * These values could be set by the bt_psplit routine.  The problem is that the
727*50989Sbostic  * entry has to be popped off of the stack etc. or the values have to be passed
728*50989Sbostic  * all the way back to bt_split/bt_rroot and it's not very clean.
729*50989Sbostic  */
730*50989Sbostic static recno_t
731*50989Sbostic rec_total(h)
732*50989Sbostic 	PAGE *h;
733*50989Sbostic {
734*50989Sbostic 	recno_t recs;
735*50989Sbostic 	index_t nxt, top;
736*50989Sbostic 
737*50989Sbostic 	for (recs = 0, nxt = 0, top = NEXTINDEX(h); nxt < top; ++nxt)
738*50989Sbostic 		recs += GETRINTERNAL(h, nxt)->nrecs;
739*50989Sbostic 	return (recs);
740*50989Sbostic }
741