xref: /openbsd-src/usr.sbin/ldapd/btree.c (revision 0b7734b3d77bb9b21afec6f4621cae6c805dbd45)
1 /*	$OpenBSD: btree.c,v 1.36 2016/03/20 00:01:22 krw Exp $ */
2 
3 /*
4  * Copyright (c) 2009, 2010 Martin Hedenfalk <martin@bzero.se>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/tree.h>
21 #include <sys/stat.h>
22 #include <sys/queue.h>
23 #include <sys/uio.h>
24 
25 #include <assert.h>
26 #include <err.h>
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <stddef.h>
30 #include <stdint.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <time.h>
35 #include <unistd.h>
36 
37 #include "btree.h"
38 
39 /* #define DEBUG */
40 
41 #ifdef DEBUG
42 # define DPRINTF(...)	do { fprintf(stderr, "%s:%d: ", __func__, __LINE__); \
43 			     fprintf(stderr, __VA_ARGS__); \
44 			     fprintf(stderr, "\n"); } while(0)
45 #else
46 # define DPRINTF(...)
47 #endif
48 
49 #define PAGESIZE	 4096
50 #define BT_MINKEYS	 4
51 #define BT_MAGIC	 0xB3DBB3DB
52 #define BT_VERSION	 4
53 #define MAXKEYSIZE	 255
54 
55 #define P_INVALID	 0xFFFFFFFF
56 
57 #define F_ISSET(w, f)	 (((w) & (f)) == (f))
58 
59 typedef uint32_t	 pgno_t;
60 typedef uint16_t	 indx_t;
61 
62 /* There are four page types: meta, index, leaf and overflow.
63  * They all share the same page header.
64  */
65 struct page {				/* represents an on-disk page */
66 	pgno_t		 pgno;		/* page number */
67 #define	P_BRANCH	 0x01		/* branch page */
68 #define	P_LEAF		 0x02		/* leaf page */
69 #define	P_OVERFLOW	 0x04		/* overflow page */
70 #define	P_META		 0x08		/* meta page */
71 #define	P_HEAD		 0x10		/* header page */
72 	uint32_t	 flags;
73 #define lower		 b.fb.fb_lower
74 #define upper		 b.fb.fb_upper
75 #define p_next_pgno	 b.pb_next_pgno
76 	union page_bounds {
77 		struct {
78 			indx_t	 fb_lower;	/* lower bound of free space */
79 			indx_t	 fb_upper;	/* upper bound of free space */
80 		} fb;
81 		pgno_t		 pb_next_pgno;	/* overflow page linked list */
82 	} b;
83 	indx_t		 ptrs[1];		/* dynamic size */
84 } __packed;
85 
86 #define PAGEHDRSZ	 offsetof(struct page, ptrs)
87 
88 #define NUMKEYSP(p)	 (((p)->lower - PAGEHDRSZ) >> 1)
89 #define NUMKEYS(mp)	 (((mp)->page->lower - PAGEHDRSZ) >> 1)
90 #define SIZELEFT(mp)	 (indx_t)((mp)->page->upper - (mp)->page->lower)
91 #define PAGEFILL(bt, mp) (1000 * ((bt)->head.psize - PAGEHDRSZ - SIZELEFT(mp)) / \
92 				((bt)->head.psize - PAGEHDRSZ))
93 #define IS_LEAF(mp)	 F_ISSET((mp)->page->flags, P_LEAF)
94 #define IS_BRANCH(mp)	 F_ISSET((mp)->page->flags, P_BRANCH)
95 #define IS_OVERFLOW(mp)	 F_ISSET((mp)->page->flags, P_OVERFLOW)
96 
97 struct bt_head {				/* header page content */
98 	uint32_t	 magic;
99 	uint32_t	 version;
100 	uint32_t	 flags;
101 	uint32_t	 psize;			/* page size */
102 } __packed;
103 
104 struct bt_meta {				/* meta (footer) page content */
105 #define BT_TOMBSTONE	 0x01			/* file is replaced */
106 	uint32_t	 flags;
107 	pgno_t		 root;			/* page number of root page */
108 	pgno_t		 prev_meta;		/* previous meta page number */
109 	time_t		 created_at;
110 	uint32_t	 branch_pages;
111 	uint32_t	 leaf_pages;
112 	uint32_t	 overflow_pages;
113 	uint32_t	 revisions;
114 	uint32_t	 depth;
115 	uint64_t	 entries;
116 	unsigned char	 hash[SHA_DIGEST_LENGTH];
117 } __packed;
118 
119 struct btkey {
120 	size_t			 len;
121 	char			 str[MAXKEYSIZE];
122 };
123 
124 struct mpage {					/* an in-memory cached page */
125 	RB_ENTRY(mpage)		 entry;		/* page cache entry */
126 	SIMPLEQ_ENTRY(mpage)	 next;		/* queue of dirty pages */
127 	TAILQ_ENTRY(mpage)	 lru_next;	/* LRU queue */
128 	struct mpage		*parent;	/* NULL if root */
129 	unsigned int		 parent_index;	/* keep track of node index */
130 	struct btkey		 prefix;
131 	struct page		*page;
132 	pgno_t			 pgno;		/* copy of page->pgno */
133 	short			 ref;		/* increased by cursors */
134 	short			 dirty;		/* 1 if on dirty queue */
135 };
136 RB_HEAD(page_cache, mpage);
137 SIMPLEQ_HEAD(dirty_queue, mpage);
138 TAILQ_HEAD(lru_queue, mpage);
139 
140 static int		 mpage_cmp(struct mpage *a, struct mpage *b);
141 static struct mpage	*mpage_lookup(struct btree *bt, pgno_t pgno);
142 static void		 mpage_add(struct btree *bt, struct mpage *mp);
143 static void		 mpage_free(struct mpage *mp);
144 static void		 mpage_del(struct btree *bt, struct mpage *mp);
145 static void		 mpage_flush(struct btree *bt);
146 static struct mpage	*mpage_copy(struct btree *bt, struct mpage *mp);
147 static void		 mpage_prune(struct btree *bt);
148 static void		 mpage_dirty(struct btree *bt, struct mpage *mp);
149 static struct mpage	*mpage_touch(struct btree *bt, struct mpage *mp);
150 
151 RB_PROTOTYPE(page_cache, mpage, entry, mpage_cmp);
152 RB_GENERATE(page_cache, mpage, entry, mpage_cmp);
153 
154 struct ppage {					/* ordered list of pages */
155 	SLIST_ENTRY(ppage)	 entry;
156 	struct mpage		*mpage;
157 	unsigned int		 ki;		/* cursor index on page */
158 };
159 SLIST_HEAD(page_stack, ppage);
160 
161 #define CURSOR_EMPTY(c)		 SLIST_EMPTY(&(c)->stack)
162 #define CURSOR_TOP(c)		 SLIST_FIRST(&(c)->stack)
163 #define CURSOR_POP(c)		 SLIST_REMOVE_HEAD(&(c)->stack, entry)
164 #define CURSOR_PUSH(c,p)	 SLIST_INSERT_HEAD(&(c)->stack, p, entry)
165 
166 struct cursor {
167 	struct btree		*bt;
168 	struct btree_txn	*txn;
169 	struct page_stack	 stack;		/* stack of parent pages */
170 	short			 initialized;	/* 1 if initialized */
171 	short			 eof;		/* 1 if end is reached */
172 };
173 
174 #define METAHASHLEN	 offsetof(struct bt_meta, hash)
175 #define METADATA(p)	 ((void *)((char *)p + PAGEHDRSZ))
176 
177 struct node {
178 #define n_pgno		 p.np_pgno
179 #define n_dsize		 p.np_dsize
180 	union {
181 		pgno_t		 np_pgno;	/* child page number */
182 		uint32_t	 np_dsize;	/* leaf data size */
183 	}		 p;
184 	uint16_t	 ksize;			/* key size */
185 #define F_BIGDATA	 0x01			/* data put on overflow page */
186 	uint8_t		 flags;
187 	char		 data[1];
188 } __packed;
189 
190 struct btree_txn {
191 	pgno_t			 root;		/* current / new root page */
192 	pgno_t			 next_pgno;	/* next unallocated page */
193 	struct btree		*bt;		/* btree is ref'd */
194 	struct dirty_queue	*dirty_queue;	/* modified pages */
195 #define BT_TXN_RDONLY		 0x01		/* read-only transaction */
196 #define BT_TXN_ERROR		 0x02		/* an error has occurred */
197 	unsigned int		 flags;
198 };
199 
200 struct btree {
201 	int			 fd;
202 	char			*path;
203 #define BT_FIXPADDING		 0x01		/* internal */
204 	unsigned int		 flags;
205 	bt_cmp_func		 cmp;		/* user compare function */
206 	struct bt_head		 head;
207 	struct bt_meta		 meta;
208 	struct page_cache	*page_cache;
209 	struct lru_queue	*lru_queue;
210 	struct btree_txn	*txn;		/* current write transaction */
211 	int			 ref;		/* increased by cursors & txn */
212 	struct btree_stat	 stat;
213 	off_t			 size;		/* current file size */
214 };
215 
216 #define NODESIZE	 offsetof(struct node, data)
217 
218 #define INDXSIZE(k)	 (NODESIZE + ((k) == NULL ? 0 : (k)->size))
219 #define LEAFSIZE(k, d)	 (NODESIZE + (k)->size + (d)->size)
220 #define NODEPTRP(p, i)	 ((struct node *)((char *)(p) + (p)->ptrs[i]))
221 #define NODEPTR(mp, i)	 NODEPTRP((mp)->page, i)
222 #define NODEKEY(node)	 (void *)((node)->data)
223 #define NODEDATA(node)	 (void *)((char *)(node)->data + (node)->ksize)
224 #define NODEPGNO(node)	 ((node)->p.np_pgno)
225 #define NODEDSZ(node)	 ((node)->p.np_dsize)
226 
227 #define BT_COMMIT_PAGES	 64	/* max number of pages to write in one commit */
228 #define BT_MAXCACHE_DEF	 1024	/* max number of pages to keep in cache  */
229 
230 static int		 btree_read_page(struct btree *bt, pgno_t pgno,
231 			    struct page *page);
232 static struct mpage	*btree_get_mpage(struct btree *bt, pgno_t pgno);
233 static int		 btree_search_page_root(struct btree *bt,
234 			    struct mpage *root, struct btval *key,
235 			    struct cursor *cursor, int modify,
236 			    struct mpage **mpp);
237 static int		 btree_search_page(struct btree *bt,
238 			    struct btree_txn *txn, struct btval *key,
239 			    struct cursor *cursor, int modify,
240 			    struct mpage **mpp);
241 
242 static int		 btree_write_header(struct btree *bt, int fd);
243 static int		 btree_read_header(struct btree *bt);
244 static int		 btree_is_meta_page(struct page *p);
245 static int		 btree_read_meta(struct btree *bt, pgno_t *p_next);
246 static int		 btree_write_meta(struct btree *bt, pgno_t root,
247 			    unsigned int flags);
248 static void		 btree_ref(struct btree *bt);
249 
250 static struct node	*btree_search_node(struct btree *bt, struct mpage *mp,
251 			    struct btval *key, int *exactp, unsigned int *kip);
252 static int		 btree_add_node(struct btree *bt, struct mpage *mp,
253 			    indx_t indx, struct btval *key, struct btval *data,
254 			    pgno_t pgno, uint8_t flags);
255 static void		 btree_del_node(struct btree *bt, struct mpage *mp,
256 			    indx_t indx);
257 static int		 btree_read_data(struct btree *bt, struct mpage *mp,
258 			    struct node *leaf, struct btval *data);
259 
260 static int		 btree_rebalance(struct btree *bt, struct mpage *mp);
261 static int		 btree_update_key(struct btree *bt, struct mpage *mp,
262 			    indx_t indx, struct btval *key);
263 static int		 btree_adjust_prefix(struct btree *bt,
264 			    struct mpage *src, int delta);
265 static int		 btree_move_node(struct btree *bt, struct mpage *src,
266 			    indx_t srcindx, struct mpage *dst, indx_t dstindx);
267 static int		 btree_merge(struct btree *bt, struct mpage *src,
268 			    struct mpage *dst);
269 static int		 btree_split(struct btree *bt, struct mpage **mpp,
270 			    unsigned int *newindxp, struct btval *newkey,
271 			    struct btval *newdata, pgno_t newpgno);
272 static struct mpage	*btree_new_page(struct btree *bt, uint32_t flags);
273 static int		 btree_write_overflow_data(struct btree *bt,
274 			    struct page *p, struct btval *data);
275 
276 static void		 cursor_pop_page(struct cursor *cursor);
277 static struct ppage	 *cursor_push_page(struct cursor *cursor,
278 			    struct mpage *mp);
279 
280 static int		 bt_set_key(struct btree *bt, struct mpage *mp,
281 			    struct node *node, struct btval *key);
282 static int		 btree_sibling(struct cursor *cursor, int move_right);
283 static int		 btree_cursor_next(struct cursor *cursor,
284 			    struct btval *key, struct btval *data);
285 static int		 btree_cursor_set(struct cursor *cursor,
286 			    struct btval *key, struct btval *data, int *exactp);
287 static int		 btree_cursor_first(struct cursor *cursor,
288 			    struct btval *key, struct btval *data);
289 
290 static void		 bt_reduce_separator(struct btree *bt, struct node *min,
291 			    struct btval *sep);
292 static void		 remove_prefix(struct btree *bt, struct btval *key,
293 			    size_t pfxlen);
294 static void		 expand_prefix(struct btree *bt, struct mpage *mp,
295 			    indx_t indx, struct btkey *expkey);
296 static void		 concat_prefix(struct btree *bt, char *s1, size_t n1,
297 			    char *s2, size_t n2, char *cs, size_t *cn);
298 static void		 common_prefix(struct btree *bt, struct btkey *min,
299 			    struct btkey *max, struct btkey *pfx);
300 static void		 find_common_prefix(struct btree *bt, struct mpage *mp);
301 
302 static size_t		 bt_leaf_size(struct btree *bt, struct btval *key,
303 			    struct btval *data);
304 static size_t		 bt_branch_size(struct btree *bt, struct btval *key);
305 
306 static pgno_t		 btree_compact_tree(struct btree *bt, pgno_t pgno,
307 			    struct btree *btc);
308 
309 static int		 memncmp(const void *s1, size_t n1,
310 				 const void *s2, size_t n2);
311 static int		 memnrcmp(const void *s1, size_t n1,
312 				  const void *s2, size_t n2);
313 
314 static int
315 memncmp(const void *s1, size_t n1, const void *s2, size_t n2)
316 {
317 	if (n1 < n2) {
318 		if (memcmp(s1, s2, n1) == 0)
319 			return -1;
320 	}
321 	else if (n1 > n2) {
322 		if (memcmp(s1, s2, n2) == 0)
323 			return 1;
324 	}
325 	return memcmp(s1, s2, n1);
326 }
327 
328 static int
329 memnrcmp(const void *s1, size_t n1, const void *s2, size_t n2)
330 {
331 	const unsigned char	*p1;
332 	const unsigned char	*p2;
333 
334 	if (n1 == 0)
335 		return n2 == 0 ? 0 : -1;
336 
337 	if (n2 == 0)
338 		return n1 == 0 ? 0 : 1;
339 
340 	p1 = (const unsigned char *)s1 + n1 - 1;
341 	p2 = (const unsigned char *)s2 + n2 - 1;
342 
343 	while (*p1 == *p2) {
344 		if (p1 == s1)
345 			return (p2 == s2) ? 0 : -1;
346 		if (p2 == s2)
347 			return (p1 == p2) ? 0 : 1;
348 		p1--;
349 		p2--;
350 	}
351 	return *p1 - *p2;
352 }
353 
354 int
355 btree_cmp(struct btree *bt, const struct btval *a, const struct btval *b)
356 {
357 	return bt->cmp(a, b);
358 }
359 
360 static void
361 common_prefix(struct btree *bt, struct btkey *min, struct btkey *max,
362     struct btkey *pfx)
363 {
364 	size_t		 n = 0;
365 	char		*p1;
366 	char		*p2;
367 
368 	if (min->len == 0 || max->len == 0) {
369 		pfx->len = 0;
370 		return;
371 	}
372 
373 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
374 		p1 = min->str + min->len - 1;
375 		p2 = max->str + max->len - 1;
376 
377 		while (*p1 == *p2) {
378 			if (p1 < min->str || p2 < max->str)
379 				break;
380 			p1--;
381 			p2--;
382 			n++;
383 		}
384 
385 		assert(n <= (int)sizeof(pfx->str));
386 		pfx->len = n;
387 		bcopy(p2 + 1, pfx->str, n);
388 	} else {
389 		p1 = min->str;
390 		p2 = max->str;
391 
392 		while (*p1 == *p2) {
393 			if (n == min->len || n == max->len)
394 				break;
395 			p1++;
396 			p2++;
397 			n++;
398 		}
399 
400 		assert(n <= (int)sizeof(pfx->str));
401 		pfx->len = n;
402 		bcopy(max->str, pfx->str, n);
403 	}
404 }
405 
406 static void
407 remove_prefix(struct btree *bt, struct btval *key, size_t pfxlen)
408 {
409 	if (pfxlen == 0 || bt->cmp != NULL)
410 		return;
411 
412 	DPRINTF("removing %zu bytes of prefix from key [%.*s]", pfxlen,
413 	    (int)key->size, (char *)key->data);
414 	assert(pfxlen <= key->size);
415 	key->size -= pfxlen;
416 	if (!F_ISSET(bt->flags, BT_REVERSEKEY))
417 		key->data = (char *)key->data + pfxlen;
418 }
419 
420 static void
421 expand_prefix(struct btree *bt, struct mpage *mp, indx_t indx,
422     struct btkey *expkey)
423 {
424 	struct node	*node;
425 
426 	node = NODEPTR(mp, indx);
427 	expkey->len = sizeof(expkey->str);
428 	concat_prefix(bt, mp->prefix.str, mp->prefix.len,
429 	    NODEKEY(node), node->ksize, expkey->str, &expkey->len);
430 }
431 
432 static int
433 bt_cmp(struct btree *bt, const struct btval *key1, const struct btval *key2,
434     struct btkey *pfx)
435 {
436 	if (F_ISSET(bt->flags, BT_REVERSEKEY))
437 		return memnrcmp(key1->data, key1->size - pfx->len,
438 		    key2->data, key2->size);
439 	else
440 		return memncmp((char *)key1->data + pfx->len, key1->size - pfx->len,
441 		    key2->data, key2->size);
442 }
443 
444 void
445 btval_reset(struct btval *btv)
446 {
447 	if (btv) {
448 		if (btv->mp)
449 			btv->mp->ref--;
450 		if (btv->free_data)
451 			free(btv->data);
452 		memset(btv, 0, sizeof(*btv));
453 	}
454 }
455 
456 static int
457 mpage_cmp(struct mpage *a, struct mpage *b)
458 {
459 	if (a->pgno > b->pgno)
460 		return 1;
461 	if (a->pgno < b->pgno)
462 		return -1;
463 	return 0;
464 }
465 
466 static struct mpage *
467 mpage_lookup(struct btree *bt, pgno_t pgno)
468 {
469 	struct mpage	 find, *mp;
470 
471 	find.pgno = pgno;
472 	mp = RB_FIND(page_cache, bt->page_cache, &find);
473 	if (mp) {
474 		bt->stat.hits++;
475 		/* Update LRU queue. Move page to the end. */
476 		TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
477 		TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
478 	}
479 	return mp;
480 }
481 
482 static void
483 mpage_add(struct btree *bt, struct mpage *mp)
484 {
485 	assert(RB_INSERT(page_cache, bt->page_cache, mp) == NULL);
486 	bt->stat.cache_size++;
487 	TAILQ_INSERT_TAIL(bt->lru_queue, mp, lru_next);
488 }
489 
490 static void
491 mpage_free(struct mpage *mp)
492 {
493 	if (mp != NULL) {
494 		free(mp->page);
495 		free(mp);
496 	}
497 }
498 
499 static void
500 mpage_del(struct btree *bt, struct mpage *mp)
501 {
502 	assert(RB_REMOVE(page_cache, bt->page_cache, mp) == mp);
503 	assert(bt->stat.cache_size > 0);
504 	bt->stat.cache_size--;
505 	TAILQ_REMOVE(bt->lru_queue, mp, lru_next);
506 }
507 
508 static void
509 mpage_flush(struct btree *bt)
510 {
511 	struct mpage	*mp;
512 
513 	while ((mp = RB_MIN(page_cache, bt->page_cache)) != NULL) {
514 		mpage_del(bt, mp);
515 		mpage_free(mp);
516 	}
517 }
518 
519 static struct mpage *
520 mpage_copy(struct btree *bt, struct mpage *mp)
521 {
522 	struct mpage	*copy;
523 
524 	if ((copy = calloc(1, sizeof(*copy))) == NULL)
525 		return NULL;
526 	if ((copy->page = malloc(bt->head.psize)) == NULL) {
527 		free(copy);
528 		return NULL;
529 	}
530 	bcopy(mp->page, copy->page, bt->head.psize);
531 	bcopy(&mp->prefix, &copy->prefix, sizeof(mp->prefix));
532 	copy->parent = mp->parent;
533 	copy->parent_index = mp->parent_index;
534 	copy->pgno = mp->pgno;
535 
536 	return copy;
537 }
538 
539 /* Remove the least recently used memory pages until the cache size is
540  * within the configured bounds. Pages referenced by cursors or returned
541  * key/data are not pruned.
542  */
543 static void
544 mpage_prune(struct btree *bt)
545 {
546 	struct mpage	*mp, *next;
547 
548 	for (mp = TAILQ_FIRST(bt->lru_queue); mp; mp = next) {
549 		if (bt->stat.cache_size <= bt->stat.max_cache)
550 			break;
551 		next = TAILQ_NEXT(mp, lru_next);
552 		if (!mp->dirty && mp->ref <= 0) {
553 			mpage_del(bt, mp);
554 			mpage_free(mp);
555 		}
556 	}
557 }
558 
559 /* Mark a page as dirty and push it on the dirty queue.
560  */
561 static void
562 mpage_dirty(struct btree *bt, struct mpage *mp)
563 {
564 	assert(bt != NULL);
565 	assert(bt->txn != NULL);
566 
567 	if (!mp->dirty) {
568 		mp->dirty = 1;
569 		SIMPLEQ_INSERT_TAIL(bt->txn->dirty_queue, mp, next);
570 	}
571 }
572 
573 /* Touch a page: make it dirty and re-insert into tree with updated pgno.
574  */
575 static struct mpage *
576 mpage_touch(struct btree *bt, struct mpage *mp)
577 {
578 	assert(bt != NULL);
579 	assert(bt->txn != NULL);
580 	assert(mp != NULL);
581 
582 	if (!mp->dirty) {
583 		DPRINTF("touching page %u -> %u", mp->pgno, bt->txn->next_pgno);
584 		if (mp->ref == 0)
585 			mpage_del(bt, mp);
586 		else {
587 			if ((mp = mpage_copy(bt, mp)) == NULL)
588 				return NULL;
589 		}
590 		mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
591 		mpage_dirty(bt, mp);
592 		mpage_add(bt, mp);
593 
594 		/* Update the page number to new touched page. */
595 		if (mp->parent != NULL)
596 			NODEPGNO(NODEPTR(mp->parent,
597 			    mp->parent_index)) = mp->pgno;
598 	}
599 
600 	return mp;
601 }
602 
603 static int
604 btree_read_page(struct btree *bt, pgno_t pgno, struct page *page)
605 {
606 	ssize_t		 rc;
607 
608 	DPRINTF("reading page %u", pgno);
609 	bt->stat.reads++;
610 	if ((rc = pread(bt->fd, page, bt->head.psize, (off_t)pgno*bt->head.psize)) == 0) {
611 		DPRINTF("page %u doesn't exist", pgno);
612 		errno = ENOENT;
613 		return BT_FAIL;
614 	} else if (rc != (ssize_t)bt->head.psize) {
615 		if (rc > 0)
616 			errno = EINVAL;
617 		DPRINTF("read: %s", strerror(errno));
618 		return BT_FAIL;
619 	}
620 
621 	if (page->pgno != pgno) {
622 		DPRINTF("page numbers don't match: %u != %u", pgno, page->pgno);
623 		errno = EINVAL;
624 		return BT_FAIL;
625 	}
626 
627 	DPRINTF("page %u has flags 0x%X", pgno, page->flags);
628 
629 	return BT_SUCCESS;
630 }
631 
632 int
633 btree_sync(struct btree *bt)
634 {
635 	if (!F_ISSET(bt->flags, BT_NOSYNC))
636 		return fsync(bt->fd);
637 	return 0;
638 }
639 
640 struct btree_txn *
641 btree_txn_begin(struct btree *bt, int rdonly)
642 {
643 	struct btree_txn	*txn;
644 
645 	if (!rdonly && bt->txn != NULL) {
646 		DPRINTF("write transaction already begun");
647 		errno = EBUSY;
648 		return NULL;
649 	}
650 
651 	if ((txn = calloc(1, sizeof(*txn))) == NULL) {
652 		DPRINTF("calloc: %s", strerror(errno));
653 		return NULL;
654 	}
655 
656 	if (rdonly) {
657 		txn->flags |= BT_TXN_RDONLY;
658 	} else {
659 		txn->dirty_queue = calloc(1, sizeof(*txn->dirty_queue));
660 		if (txn->dirty_queue == NULL) {
661 			free(txn);
662 			return NULL;
663 		}
664 		SIMPLEQ_INIT(txn->dirty_queue);
665 
666 		DPRINTF("taking write lock on txn %p", txn);
667 		if (flock(bt->fd, LOCK_EX | LOCK_NB) != 0) {
668 			DPRINTF("flock: %s", strerror(errno));
669 			errno = EBUSY;
670 			free(txn->dirty_queue);
671 			free(txn);
672 			return NULL;
673 		}
674 		bt->txn = txn;
675 	}
676 
677 	txn->bt = bt;
678 	btree_ref(bt);
679 
680 	if (btree_read_meta(bt, &txn->next_pgno) != BT_SUCCESS) {
681 		btree_txn_abort(txn);
682 		return NULL;
683 	}
684 
685 	txn->root = bt->meta.root;
686 	DPRINTF("begin transaction on btree %p, root page %u", bt, txn->root);
687 
688 	return txn;
689 }
690 
691 void
692 btree_txn_abort(struct btree_txn *txn)
693 {
694 	struct mpage	*mp;
695 	struct btree	*bt;
696 
697 	if (txn == NULL)
698 		return;
699 
700 	bt = txn->bt;
701 	DPRINTF("abort transaction on btree %p, root page %u", bt, txn->root);
702 
703 	if (!F_ISSET(txn->flags, BT_TXN_RDONLY)) {
704 		/* Discard all dirty pages.
705 		 */
706 		while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
707 			mp = SIMPLEQ_FIRST(txn->dirty_queue);
708 			assert(mp->ref == 0);	/* cursors should be closed */
709 			mpage_del(bt, mp);
710 			SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
711 			mpage_free(mp);
712 		}
713 
714 		DPRINTF("releasing write lock on txn %p", txn);
715 		txn->bt->txn = NULL;
716 		if (flock(txn->bt->fd, LOCK_UN) != 0) {
717 			DPRINTF("failed to unlock fd %d: %s",
718 			    txn->bt->fd, strerror(errno));
719 		}
720 		free(txn->dirty_queue);
721 	}
722 
723 	btree_close(txn->bt);
724 	free(txn);
725 }
726 
727 int
728 btree_txn_commit(struct btree_txn *txn)
729 {
730 	int		 n, done;
731 	ssize_t		 rc;
732 	off_t		 size;
733 	struct mpage	*mp;
734 	struct btree	*bt;
735 	struct iovec	 iov[BT_COMMIT_PAGES];
736 
737 	assert(txn != NULL);
738 	assert(txn->bt != NULL);
739 
740 	bt = txn->bt;
741 
742 	if (F_ISSET(txn->flags, BT_TXN_RDONLY)) {
743 		DPRINTF("attempt to commit read-only transaction");
744 		btree_txn_abort(txn);
745 		errno = EPERM;
746 		return BT_FAIL;
747 	}
748 
749 	if (txn != bt->txn) {
750 		DPRINTF("attempt to commit unknown transaction");
751 		btree_txn_abort(txn);
752 		errno = EINVAL;
753 		return BT_FAIL;
754 	}
755 
756 	if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
757 		DPRINTF("error flag is set, can't commit");
758 		btree_txn_abort(txn);
759 		errno = EINVAL;
760 		return BT_FAIL;
761 	}
762 
763 	if (SIMPLEQ_EMPTY(txn->dirty_queue))
764 		goto done;
765 
766 	if (F_ISSET(bt->flags, BT_FIXPADDING)) {
767 		size = lseek(bt->fd, 0, SEEK_END);
768 		size += bt->head.psize - (size % bt->head.psize);
769 		DPRINTF("extending to multiple of page size: %llu", size);
770 		if (ftruncate(bt->fd, size) != 0) {
771 			DPRINTF("ftruncate: %s", strerror(errno));
772 			btree_txn_abort(txn);
773 			return BT_FAIL;
774 		}
775 		bt->flags &= ~BT_FIXPADDING;
776 	}
777 
778 	DPRINTF("committing transaction on btree %p, root page %u",
779 	    bt, txn->root);
780 
781 	/* Commit up to BT_COMMIT_PAGES dirty pages to disk until done.
782 	 */
783 	do {
784 		n = 0;
785 		done = 1;
786 		SIMPLEQ_FOREACH(mp, txn->dirty_queue, next) {
787 			DPRINTF("committing page %u", mp->pgno);
788 			iov[n].iov_len = bt->head.psize;
789 			iov[n].iov_base = mp->page;
790 			if (++n >= BT_COMMIT_PAGES) {
791 				done = 0;
792 				break;
793 			}
794 		}
795 
796 		if (n == 0)
797 			break;
798 
799 		DPRINTF("committing %u dirty pages", n);
800 		rc = writev(bt->fd, iov, n);
801 		if (rc != (ssize_t)bt->head.psize*n) {
802 			if (rc > 0)
803 				DPRINTF("short write, filesystem full?");
804 			else
805 				DPRINTF("writev: %s", strerror(errno));
806 			btree_txn_abort(txn);
807 			return BT_FAIL;
808 		}
809 
810 		/* Remove the dirty flag from the written pages.
811 		 */
812 		while (!SIMPLEQ_EMPTY(txn->dirty_queue)) {
813 			mp = SIMPLEQ_FIRST(txn->dirty_queue);
814 			mp->dirty = 0;
815 			SIMPLEQ_REMOVE_HEAD(txn->dirty_queue, next);
816 			if (--n == 0)
817 				break;
818 		}
819 	} while (!done);
820 
821 	if (btree_sync(bt) != 0 ||
822 	    btree_write_meta(bt, txn->root, 0) != BT_SUCCESS ||
823 	    btree_sync(bt) != 0) {
824 		btree_txn_abort(txn);
825 		return BT_FAIL;
826 	}
827 
828 done:
829 	mpage_prune(bt);
830 	btree_txn_abort(txn);
831 
832 	return BT_SUCCESS;
833 }
834 
835 static int
836 btree_write_header(struct btree *bt, int fd)
837 {
838 	struct stat	 sb;
839 	struct bt_head	*h;
840 	struct page	*p;
841 	ssize_t		 rc;
842 	unsigned int	 psize;
843 
844 	DPRINTF("writing header page");
845 	assert(bt != NULL);
846 
847 	/* Ask stat for 'optimal blocksize for I/O'.
848 	 */
849 	if (fstat(fd, &sb) == 0)
850 		psize = sb.st_blksize;
851 	else
852 		psize = PAGESIZE;
853 
854 	if ((p = calloc(1, psize)) == NULL)
855 		return -1;
856 	p->flags = P_HEAD;
857 
858 	h = METADATA(p);
859 	h->magic = BT_MAGIC;
860 	h->version = BT_VERSION;
861 	h->psize = psize;
862 	bcopy(h, &bt->head, sizeof(*h));
863 
864 	rc = write(fd, p, bt->head.psize);
865 	free(p);
866 	if (rc != (ssize_t)bt->head.psize) {
867 		if (rc > 0)
868 			DPRINTF("short write, filesystem full?");
869 		return BT_FAIL;
870 	}
871 
872 	return BT_SUCCESS;
873 }
874 
875 static int
876 btree_read_header(struct btree *bt)
877 {
878 	char		 page[PAGESIZE];
879 	struct page	*p;
880 	struct bt_head	*h;
881 	int		 rc;
882 
883 	assert(bt != NULL);
884 
885 	/* We don't know the page size yet, so use a minimum value.
886 	 */
887 
888 	if ((rc = pread(bt->fd, page, PAGESIZE, 0)) == 0) {
889 		errno = ENOENT;
890 		return -1;
891 	} else if (rc != PAGESIZE) {
892 		if (rc > 0)
893 			errno = EINVAL;
894 		DPRINTF("read: %s", strerror(errno));
895 		return -1;
896 	}
897 
898 	p = (struct page *)page;
899 
900 	if (!F_ISSET(p->flags, P_HEAD)) {
901 		DPRINTF("page %d not a header page", p->pgno);
902 		errno = EINVAL;
903 		return -1;
904 	}
905 
906 	h = METADATA(p);
907 	if (h->magic != BT_MAGIC) {
908 		DPRINTF("header has invalid magic");
909 		errno = EINVAL;
910 		return -1;
911 	}
912 
913 	if (h->version != BT_VERSION) {
914 		DPRINTF("database is version %u, expected version %u",
915 		    bt->head.version, BT_VERSION);
916 		errno = EINVAL;
917 		return -1;
918 	}
919 
920 	bcopy(h, &bt->head, sizeof(*h));
921 	return 0;
922 }
923 
924 static int
925 btree_write_meta(struct btree *bt, pgno_t root, unsigned int flags)
926 {
927 	struct mpage	*mp;
928 	struct bt_meta	*meta;
929 	ssize_t		 rc;
930 
931 	DPRINTF("writing meta page for root page %u", root);
932 
933 	assert(bt != NULL);
934 	assert(bt->txn != NULL);
935 
936 	if ((mp = btree_new_page(bt, P_META)) == NULL)
937 		return -1;
938 
939 	bt->meta.prev_meta = bt->meta.root;
940 	bt->meta.root = root;
941 	bt->meta.flags = flags;
942 	bt->meta.created_at = time(0);
943 	bt->meta.revisions++;
944 	SHA1((unsigned char *)&bt->meta, METAHASHLEN, bt->meta.hash);
945 
946 	/* Copy the meta data changes to the new meta page. */
947 	meta = METADATA(mp->page);
948 	bcopy(&bt->meta, meta, sizeof(*meta));
949 
950 	rc = write(bt->fd, mp->page, bt->head.psize);
951 	mp->dirty = 0;
952 	SIMPLEQ_REMOVE_HEAD(bt->txn->dirty_queue, next);
953 	if (rc != (ssize_t)bt->head.psize) {
954 		if (rc > 0)
955 			DPRINTF("short write, filesystem full?");
956 		return BT_FAIL;
957 	}
958 
959 	if ((bt->size = lseek(bt->fd, 0, SEEK_END)) == -1) {
960 		DPRINTF("failed to update file size: %s", strerror(errno));
961 		bt->size = 0;
962 	}
963 
964 	return BT_SUCCESS;
965 }
966 
967 /* Returns true if page p is a valid meta page, false otherwise.
968  */
969 static int
970 btree_is_meta_page(struct page *p)
971 {
972 	struct bt_meta	*m;
973 	unsigned char	 hash[SHA_DIGEST_LENGTH];
974 
975 	m = METADATA(p);
976 	if (!F_ISSET(p->flags, P_META)) {
977 		DPRINTF("page %d not a meta page", p->pgno);
978 		errno = EINVAL;
979 		return 0;
980 	}
981 
982 	if (m->root >= p->pgno && m->root != P_INVALID) {
983 		DPRINTF("page %d points to an invalid root page", p->pgno);
984 		errno = EINVAL;
985 		return 0;
986 	}
987 
988 	SHA1((unsigned char *)m, METAHASHLEN, hash);
989 	if (bcmp(hash, m->hash, SHA_DIGEST_LENGTH) != 0) {
990 		DPRINTF("page %d has an invalid digest", p->pgno);
991 		errno = EINVAL;
992 		return 0;
993 	}
994 
995 	return 1;
996 }
997 
998 static int
999 btree_read_meta(struct btree *bt, pgno_t *p_next)
1000 {
1001 	struct mpage	*mp;
1002 	struct bt_meta	*meta;
1003 	pgno_t		 meta_pgno, next_pgno;
1004 	off_t		 size;
1005 
1006 	assert(bt != NULL);
1007 
1008 	if ((size = lseek(bt->fd, 0, SEEK_END)) == -1)
1009 		goto fail;
1010 
1011 	DPRINTF("btree_read_meta: size = %llu", size);
1012 
1013 	if (size < bt->size) {
1014 		DPRINTF("file has shrunk!");
1015 		errno = EIO;
1016 		goto fail;
1017 	}
1018 
1019 	if (size == bt->head.psize) {		/* there is only the header */
1020 		if (p_next != NULL)
1021 			*p_next = 1;
1022 		return BT_SUCCESS;		/* new file */
1023 	}
1024 
1025 	next_pgno = size / bt->head.psize;
1026 	if (next_pgno == 0) {
1027 		DPRINTF("corrupt file");
1028 		errno = EIO;
1029 		goto fail;
1030 	}
1031 
1032 	meta_pgno = next_pgno - 1;
1033 
1034 	if (size % bt->head.psize != 0) {
1035 		DPRINTF("filesize not a multiple of the page size!");
1036 		bt->flags |= BT_FIXPADDING;
1037 		next_pgno++;
1038 	}
1039 
1040 	if (p_next != NULL)
1041 		*p_next = next_pgno;
1042 
1043 	if (size == bt->size) {
1044 		DPRINTF("size unchanged, keeping current meta page");
1045 		if (F_ISSET(bt->meta.flags, BT_TOMBSTONE)) {
1046 			DPRINTF("file is dead");
1047 			errno = ESTALE;
1048 			return BT_FAIL;
1049 		} else
1050 			return BT_SUCCESS;
1051 	}
1052 	bt->size = size;
1053 
1054 	while (meta_pgno > 0) {
1055 		if ((mp = btree_get_mpage(bt, meta_pgno)) == NULL)
1056 			break;
1057 		if (btree_is_meta_page(mp->page)) {
1058 			meta = METADATA(mp->page);
1059 			DPRINTF("flags = 0x%x", meta->flags);
1060 			if (F_ISSET(meta->flags, BT_TOMBSTONE)) {
1061 				DPRINTF("file is dead");
1062 				errno = ESTALE;
1063 				return BT_FAIL;
1064 			} else {
1065 				/* Make copy of last meta page. */
1066 				bcopy(meta, &bt->meta, sizeof(bt->meta));
1067 				return BT_SUCCESS;
1068 			}
1069 		}
1070 		--meta_pgno;	/* scan backwards to first valid meta page */
1071 	}
1072 
1073 	errno = EIO;
1074 fail:
1075 	if (p_next != NULL)
1076 		*p_next = P_INVALID;
1077 	return BT_FAIL;
1078 }
1079 
1080 struct btree *
1081 btree_open_fd(int fd, unsigned int flags)
1082 {
1083 	struct btree	*bt;
1084 	int		 fl;
1085 
1086 	fl = fcntl(fd, F_GETFL);
1087 	if (fcntl(fd, F_SETFL, fl | O_APPEND) == -1)
1088 		return NULL;
1089 
1090 	if ((bt = calloc(1, sizeof(*bt))) == NULL)
1091 		return NULL;
1092 	bt->fd = fd;
1093 	bt->flags = flags;
1094 	bt->flags &= ~BT_FIXPADDING;
1095 	bt->ref = 1;
1096 	bt->meta.root = P_INVALID;
1097 
1098 	if ((bt->page_cache = calloc(1, sizeof(*bt->page_cache))) == NULL)
1099 		goto fail;
1100 	bt->stat.max_cache = BT_MAXCACHE_DEF;
1101 	RB_INIT(bt->page_cache);
1102 
1103 	if ((bt->lru_queue = calloc(1, sizeof(*bt->lru_queue))) == NULL)
1104 		goto fail;
1105 	TAILQ_INIT(bt->lru_queue);
1106 
1107 	if (btree_read_header(bt) != 0) {
1108 		if (errno != ENOENT)
1109 			goto fail;
1110 		DPRINTF("new database");
1111 		btree_write_header(bt, bt->fd);
1112 	}
1113 
1114 	if (btree_read_meta(bt, NULL) != 0)
1115 		goto fail;
1116 
1117 	DPRINTF("opened database version %u, pagesize %u",
1118 	    bt->head.version, bt->head.psize);
1119 	DPRINTF("timestamp: %s", ctime(&bt->meta.created_at));
1120 	DPRINTF("depth: %u", bt->meta.depth);
1121 	DPRINTF("entries: %llu", bt->meta.entries);
1122 	DPRINTF("revisions: %u", bt->meta.revisions);
1123 	DPRINTF("branch pages: %u", bt->meta.branch_pages);
1124 	DPRINTF("leaf pages: %u", bt->meta.leaf_pages);
1125 	DPRINTF("overflow pages: %u", bt->meta.overflow_pages);
1126 	DPRINTF("root: %u", bt->meta.root);
1127 	DPRINTF("previous meta page: %u", bt->meta.prev_meta);
1128 
1129 	return bt;
1130 
1131 fail:
1132 	free(bt->lru_queue);
1133 	free(bt->page_cache);
1134 	free(bt);
1135 	return NULL;
1136 }
1137 
1138 struct btree *
1139 btree_open(const char *path, unsigned int flags, mode_t mode)
1140 {
1141 	int		 fd, oflags;
1142 	struct btree	*bt;
1143 
1144 	if (F_ISSET(flags, BT_RDONLY))
1145 		oflags = O_RDONLY;
1146 	else
1147 		oflags = O_RDWR | O_CREAT | O_APPEND;
1148 
1149 	if ((fd = open(path, oflags, mode)) == -1)
1150 		return NULL;
1151 
1152 	if ((bt = btree_open_fd(fd, flags)) == NULL)
1153 		close(fd);
1154 	else {
1155 		bt->path = strdup(path);
1156 		DPRINTF("opened btree %p", bt);
1157 	}
1158 
1159 	return bt;
1160 }
1161 
1162 static void
1163 btree_ref(struct btree *bt)
1164 {
1165 	bt->ref++;
1166 	DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1167 }
1168 
1169 void
1170 btree_close(struct btree *bt)
1171 {
1172 	if (bt == NULL)
1173 		return;
1174 
1175 	if (--bt->ref == 0) {
1176 		DPRINTF("ref is zero, closing btree %p", bt);
1177 		close(bt->fd);
1178 		mpage_flush(bt);
1179 		free(bt->page_cache);
1180 		free(bt);
1181 	} else
1182 		DPRINTF("ref is now %d on btree %p", bt->ref, bt);
1183 }
1184 
1185 /* Search for key within a leaf page, using binary search.
1186  * Returns the smallest entry larger or equal to the key.
1187  * If exactp is non-null, stores whether the found entry was an exact match
1188  * in *exactp (1 or 0).
1189  * If kip is non-null, stores the index of the found entry in *kip.
1190  * If no entry larger of equal to the key is found, returns NULL.
1191  */
1192 static struct node *
1193 btree_search_node(struct btree *bt, struct mpage *mp, struct btval *key,
1194     int *exactp, unsigned int *kip)
1195 {
1196 	unsigned int	 i = 0;
1197 	int		 low, high;
1198 	int		 rc = 0;
1199 	struct node	*node;
1200 	struct btval	 nodekey;
1201 
1202 	DPRINTF("searching %lu keys in %s page %u with prefix [%.*s]",
1203 	    NUMKEYS(mp),
1204 	    IS_LEAF(mp) ? "leaf" : "branch",
1205 	    mp->pgno, (int)mp->prefix.len, (char *)mp->prefix.str);
1206 
1207 	assert(NUMKEYS(mp) > 0);
1208 
1209 	memset(&nodekey, 0, sizeof(nodekey));
1210 
1211 	low = IS_LEAF(mp) ? 0 : 1;
1212 	high = NUMKEYS(mp) - 1;
1213 	while (low <= high) {
1214 		i = (low + high) >> 1;
1215 		node = NODEPTR(mp, i);
1216 
1217 		nodekey.size = node->ksize;
1218 		nodekey.data = NODEKEY(node);
1219 
1220 		if (bt->cmp)
1221 			rc = bt->cmp(key, &nodekey);
1222 		else
1223 			rc = bt_cmp(bt, key, &nodekey, &mp->prefix);
1224 
1225 		if (IS_LEAF(mp))
1226 			DPRINTF("found leaf index %u [%.*s], rc = %d",
1227 			    i, (int)nodekey.size, (char *)nodekey.data, rc);
1228 		else
1229 			DPRINTF("found branch index %u [%.*s -> %u], rc = %d",
1230 			    i, (int)node->ksize, (char *)NODEKEY(node),
1231 			    node->n_pgno, rc);
1232 
1233 		if (rc == 0)
1234 			break;
1235 		if (rc > 0)
1236 			low = i + 1;
1237 		else
1238 			high = i - 1;
1239 	}
1240 
1241 	if (rc > 0) {	/* Found entry is less than the key. */
1242 		i++;	/* Skip to get the smallest entry larger than key. */
1243 		if (i >= NUMKEYS(mp))
1244 			/* There is no entry larger or equal to the key. */
1245 			return NULL;
1246 	}
1247 	if (exactp)
1248 		*exactp = (rc == 0);
1249 	if (kip)	/* Store the key index if requested. */
1250 		*kip = i;
1251 
1252 	return NODEPTR(mp, i);
1253 }
1254 
1255 static void
1256 cursor_pop_page(struct cursor *cursor)
1257 {
1258 	struct ppage	*top;
1259 
1260 	top = CURSOR_TOP(cursor);
1261 	CURSOR_POP(cursor);
1262 	top->mpage->ref--;
1263 
1264 	DPRINTF("popped page %u off cursor %p", top->mpage->pgno, cursor);
1265 
1266 	free(top);
1267 }
1268 
1269 static struct ppage *
1270 cursor_push_page(struct cursor *cursor, struct mpage *mp)
1271 {
1272 	struct ppage	*ppage;
1273 
1274 	DPRINTF("pushing page %u on cursor %p", mp->pgno, cursor);
1275 
1276 	if ((ppage = calloc(1, sizeof(*ppage))) == NULL)
1277 		return NULL;
1278 	ppage->mpage = mp;
1279 	mp->ref++;
1280 	CURSOR_PUSH(cursor, ppage);
1281 	return ppage;
1282 }
1283 
1284 static struct mpage *
1285 btree_get_mpage(struct btree *bt, pgno_t pgno)
1286 {
1287 	struct mpage	*mp;
1288 
1289 	mp = mpage_lookup(bt, pgno);
1290 	if (mp == NULL) {
1291 		if ((mp = calloc(1, sizeof(*mp))) == NULL)
1292 			return NULL;
1293 		if ((mp->page = malloc(bt->head.psize)) == NULL) {
1294 			free(mp);
1295 			return NULL;
1296 		}
1297 		if (btree_read_page(bt, pgno, mp->page) != BT_SUCCESS) {
1298 			mpage_free(mp);
1299 			return NULL;
1300 		}
1301 		mp->pgno = pgno;
1302 		mpage_add(bt, mp);
1303 	} else
1304 		DPRINTF("returning page %u from cache", pgno);
1305 
1306 	return mp;
1307 }
1308 
1309 static void
1310 concat_prefix(struct btree *bt, char *s1, size_t n1, char *s2, size_t n2,
1311     char *cs, size_t *cn)
1312 {
1313 	assert(*cn >= n1 + n2);
1314 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
1315 		bcopy(s2, cs, n2);
1316 		bcopy(s1, cs + n2, n1);
1317 	} else {
1318 		bcopy(s1, cs, n1);
1319 		bcopy(s2, cs + n1, n2);
1320 	}
1321 	*cn = n1 + n2;
1322 }
1323 
1324 static void
1325 find_common_prefix(struct btree *bt, struct mpage *mp)
1326 {
1327 	indx_t			 lbound = 0, ubound = 0;
1328 	struct mpage		*lp, *up;
1329 	struct btkey		 lprefix, uprefix;
1330 
1331 	mp->prefix.len = 0;
1332 	if (bt->cmp != NULL)
1333 		return;
1334 
1335 	lp = mp;
1336 	while (lp->parent != NULL) {
1337 		if (lp->parent_index > 0) {
1338 			lbound = lp->parent_index;
1339 			break;
1340 		}
1341 		lp = lp->parent;
1342 	}
1343 
1344 	up = mp;
1345 	while (up->parent != NULL) {
1346 		if (up->parent_index + 1 < (indx_t)NUMKEYS(up->parent)) {
1347 			ubound = up->parent_index + 1;
1348 			break;
1349 		}
1350 		up = up->parent;
1351 	}
1352 
1353 	if (lp->parent != NULL && up->parent != NULL) {
1354 		expand_prefix(bt, lp->parent, lbound, &lprefix);
1355 		expand_prefix(bt, up->parent, ubound, &uprefix);
1356 		common_prefix(bt, &lprefix, &uprefix, &mp->prefix);
1357 	}
1358 	else if (mp->parent)
1359 		bcopy(&mp->parent->prefix, &mp->prefix, sizeof(mp->prefix));
1360 
1361 	DPRINTF("found common prefix [%.*s] (len %zu) for page %u",
1362 	    (int)mp->prefix.len, mp->prefix.str, mp->prefix.len, mp->pgno);
1363 }
1364 
1365 static int
1366 btree_search_page_root(struct btree *bt, struct mpage *root, struct btval *key,
1367     struct cursor *cursor, int modify, struct mpage **mpp)
1368 {
1369 	struct mpage	*mp, *parent;
1370 
1371 	if (cursor && cursor_push_page(cursor, root) == NULL)
1372 		return BT_FAIL;
1373 
1374 	mp = root;
1375 	while (IS_BRANCH(mp)) {
1376 		unsigned int	 i = 0;
1377 		struct node	*node;
1378 
1379 		DPRINTF("branch page %u has %lu keys", mp->pgno, NUMKEYS(mp));
1380 		assert(NUMKEYS(mp) > 1);
1381 		DPRINTF("found index 0 to page %u", NODEPGNO(NODEPTR(mp, 0)));
1382 
1383 		if (key == NULL)	/* Initialize cursor to first page. */
1384 			i = 0;
1385 		else {
1386 			int	 exact;
1387 			node = btree_search_node(bt, mp, key, &exact, &i);
1388 			if (node == NULL)
1389 				i = NUMKEYS(mp) - 1;
1390 			else if (!exact) {
1391 				assert(i > 0);
1392 				i--;
1393 			}
1394 		}
1395 
1396 		if (key)
1397 			DPRINTF("following index %u for key %.*s",
1398 			    i, (int)key->size, (char *)key->data);
1399 		assert(i >= 0 && i < NUMKEYS(mp));
1400 		node = NODEPTR(mp, i);
1401 
1402 		if (cursor)
1403 			CURSOR_TOP(cursor)->ki = i;
1404 
1405 		parent = mp;
1406 		if ((mp = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
1407 			return BT_FAIL;
1408 		mp->parent = parent;
1409 		mp->parent_index = i;
1410 		find_common_prefix(bt, mp);
1411 
1412 		if (cursor && cursor_push_page(cursor, mp) == NULL)
1413 			return BT_FAIL;
1414 
1415 		if (modify && (mp = mpage_touch(bt, mp)) == NULL)
1416 			return BT_FAIL;
1417 	}
1418 
1419 	if (!IS_LEAF(mp)) {
1420 		DPRINTF("internal error, index points to a %02X page!?",
1421 		    mp->page->flags);
1422 		return BT_FAIL;
1423 	}
1424 
1425 	DPRINTF("found leaf page %u for key %.*s", mp->pgno,
1426 	    key ? (int)key->size : 0, key ? (char *)key->data : NULL);
1427 
1428 	*mpp = mp;
1429 	return BT_SUCCESS;
1430 }
1431 
1432 /* Search for the page a given key should be in.
1433  * Stores a pointer to the found page in *mpp.
1434  * If key is NULL, search for the lowest page (used by btree_cursor_first).
1435  * If cursor is non-null, pushes parent pages on the cursor stack.
1436  * If modify is true, visited pages are updated with new page numbers.
1437  */
1438 static int
1439 btree_search_page(struct btree *bt, struct btree_txn *txn, struct btval *key,
1440     struct cursor *cursor, int modify, struct mpage **mpp)
1441 {
1442 	int		 rc;
1443 	pgno_t		 root;
1444 	struct mpage	*mp;
1445 
1446 	/* Can't modify pages outside a transaction. */
1447 	if (txn == NULL && modify) {
1448 		errno = EINVAL;
1449 		return BT_FAIL;
1450 	}
1451 
1452 	/* Choose which root page to start with. If a transaction is given
1453          * use the root page from the transaction, otherwise read the last
1454          * committed root page.
1455 	 */
1456 	if (txn == NULL) {
1457 		if ((rc = btree_read_meta(bt, NULL)) != BT_SUCCESS)
1458 			return rc;
1459 		root = bt->meta.root;
1460 	} else if (F_ISSET(txn->flags, BT_TXN_ERROR)) {
1461 		DPRINTF("transaction has failed, must abort");
1462 		errno = EINVAL;
1463 		return BT_FAIL;
1464 	} else
1465 		root = txn->root;
1466 
1467 	if (root == P_INVALID) {		/* Tree is empty. */
1468 		DPRINTF("tree is empty");
1469 		errno = ENOENT;
1470 		return BT_FAIL;
1471 	}
1472 
1473 	if ((mp = btree_get_mpage(bt, root)) == NULL)
1474 		return BT_FAIL;
1475 
1476 	DPRINTF("root page has flags 0x%X", mp->page->flags);
1477 
1478 	assert(mp->parent == NULL);
1479 	assert(mp->prefix.len == 0);
1480 
1481 	if (modify && !mp->dirty) {
1482 		if ((mp = mpage_touch(bt, mp)) == NULL)
1483 			return BT_FAIL;
1484 		txn->root = mp->pgno;
1485 	}
1486 
1487 	return btree_search_page_root(bt, mp, key, cursor, modify, mpp);
1488 }
1489 
1490 static int
1491 btree_read_data(struct btree *bt, struct mpage *mp, struct node *leaf,
1492     struct btval *data)
1493 {
1494 	struct mpage	*omp;		/* overflow mpage */
1495 	size_t		 psz;
1496 	size_t		 max;
1497 	size_t		 sz = 0;
1498 	pgno_t		 pgno;
1499 
1500 	memset(data, 0, sizeof(*data));
1501 	max = bt->head.psize - PAGEHDRSZ;
1502 
1503 	if (!F_ISSET(leaf->flags, F_BIGDATA)) {
1504 		data->size = leaf->n_dsize;
1505 		if (data->size > 0) {
1506 			if (mp == NULL) {
1507 				if ((data->data = malloc(data->size)) == NULL)
1508 					return BT_FAIL;
1509 				bcopy(NODEDATA(leaf), data->data, data->size);
1510 				data->free_data = 1;
1511 				data->mp = NULL;
1512 			} else {
1513 				data->data = NODEDATA(leaf);
1514 				data->free_data = 0;
1515 				data->mp = mp;
1516 				mp->ref++;
1517 			}
1518 		}
1519 		return BT_SUCCESS;
1520 	}
1521 
1522 	/* Read overflow data.
1523 	 */
1524 	DPRINTF("allocating %u byte for overflow data", leaf->n_dsize);
1525 	if ((data->data = malloc(leaf->n_dsize)) == NULL)
1526 		return BT_FAIL;
1527 	data->size = leaf->n_dsize;
1528 	data->free_data = 1;
1529 	data->mp = NULL;
1530 	bcopy(NODEDATA(leaf), &pgno, sizeof(pgno));
1531 	for (sz = 0; sz < data->size; ) {
1532 		if ((omp = btree_get_mpage(bt, pgno)) == NULL ||
1533 		    !F_ISSET(omp->page->flags, P_OVERFLOW)) {
1534 			DPRINTF("read overflow page %u failed", pgno);
1535 			free(data->data);
1536 			mpage_free(omp);
1537 			return BT_FAIL;
1538 		}
1539 		psz = data->size - sz;
1540 		if (psz > max)
1541 			psz = max;
1542 		bcopy(omp->page->ptrs, (char *)data->data + sz, psz);
1543 		sz += psz;
1544 		pgno = omp->page->p_next_pgno;
1545 	}
1546 
1547 	return BT_SUCCESS;
1548 }
1549 
1550 int
1551 btree_txn_get(struct btree *bt, struct btree_txn *txn,
1552     struct btval *key, struct btval *data)
1553 {
1554 	int		 rc, exact;
1555 	struct node	*leaf;
1556 	struct mpage	*mp;
1557 
1558 	assert(key);
1559 	assert(data);
1560 	DPRINTF("===> get key [%.*s]", (int)key->size, (char *)key->data);
1561 
1562 	if (bt != NULL && txn != NULL && bt != txn->bt) {
1563 		errno = EINVAL;
1564 		return BT_FAIL;
1565 	}
1566 
1567 	if (bt == NULL) {
1568 		if (txn == NULL) {
1569 			errno = EINVAL;
1570 			return BT_FAIL;
1571 		}
1572 		bt = txn->bt;
1573 	}
1574 
1575 	if (key->size == 0 || key->size > MAXKEYSIZE) {
1576 		errno = EINVAL;
1577 		return BT_FAIL;
1578 	}
1579 
1580 	if ((rc = btree_search_page(bt, txn, key, NULL, 0, &mp)) != BT_SUCCESS)
1581 		return rc;
1582 
1583 	leaf = btree_search_node(bt, mp, key, &exact, NULL);
1584 	if (leaf && exact)
1585 		rc = btree_read_data(bt, mp, leaf, data);
1586 	else {
1587 		errno = ENOENT;
1588 		rc = BT_FAIL;
1589 	}
1590 
1591 	mpage_prune(bt);
1592 	return rc;
1593 }
1594 
1595 static int
1596 btree_sibling(struct cursor *cursor, int move_right)
1597 {
1598 	int		 rc;
1599 	struct node	*indx;
1600 	struct ppage	*parent, *top;
1601 	struct mpage	*mp;
1602 
1603 	top = CURSOR_TOP(cursor);
1604 	if ((parent = SLIST_NEXT(top, entry)) == NULL) {
1605 		errno = ENOENT;
1606 		return BT_FAIL;			/* root has no siblings */
1607 	}
1608 
1609 	DPRINTF("parent page is page %u, index %u",
1610 	    parent->mpage->pgno, parent->ki);
1611 
1612 	cursor_pop_page(cursor);
1613 	if (move_right ? (parent->ki + 1 >= NUMKEYS(parent->mpage))
1614 		       : (parent->ki == 0)) {
1615 		DPRINTF("no more keys left, moving to %s sibling",
1616 		    move_right ? "right" : "left");
1617 		if ((rc = btree_sibling(cursor, move_right)) != BT_SUCCESS)
1618 			return rc;
1619 		parent = CURSOR_TOP(cursor);
1620 	} else {
1621 		if (move_right)
1622 			parent->ki++;
1623 		else
1624 			parent->ki--;
1625 		DPRINTF("just moving to %s index key %u",
1626 		    move_right ? "right" : "left", parent->ki);
1627 	}
1628 	assert(IS_BRANCH(parent->mpage));
1629 
1630 	indx = NODEPTR(parent->mpage, parent->ki);
1631 	if ((mp = btree_get_mpage(cursor->bt, indx->n_pgno)) == NULL)
1632 		return BT_FAIL;
1633 	mp->parent = parent->mpage;
1634 	mp->parent_index = parent->ki;
1635 
1636 	cursor_push_page(cursor, mp);
1637 	find_common_prefix(cursor->bt, mp);
1638 
1639 	return BT_SUCCESS;
1640 }
1641 
1642 static int
1643 bt_set_key(struct btree *bt, struct mpage *mp, struct node *node,
1644     struct btval *key)
1645 {
1646 	if (key == NULL)
1647 		return 0;
1648 
1649 	if (mp->prefix.len > 0) {
1650 		key->size = node->ksize + mp->prefix.len;
1651 		key->data = malloc(key->size);
1652 		if (key->data == NULL)
1653 			return -1;
1654 		concat_prefix(bt,
1655 		    mp->prefix.str, mp->prefix.len,
1656 		    NODEKEY(node), node->ksize,
1657 		    key->data, &key->size);
1658 		key->free_data = 1;
1659 	} else {
1660 		key->size = node->ksize;
1661 		key->data = NODEKEY(node);
1662 		key->free_data = 0;
1663 		key->mp = mp;
1664 		mp->ref++;
1665 	}
1666 
1667 	return 0;
1668 }
1669 
1670 static int
1671 btree_cursor_next(struct cursor *cursor, struct btval *key, struct btval *data)
1672 {
1673 	struct ppage	*top;
1674 	struct mpage	*mp;
1675 	struct node	*leaf;
1676 
1677 	if (cursor->eof) {
1678 		errno = ENOENT;
1679 		return BT_FAIL;
1680 	}
1681 
1682 	assert(cursor->initialized);
1683 
1684 	top = CURSOR_TOP(cursor);
1685 	mp = top->mpage;
1686 
1687 	DPRINTF("cursor_next: top page is %u in cursor %p", mp->pgno, cursor);
1688 
1689 	if (top->ki + 1 >= NUMKEYS(mp)) {
1690 		DPRINTF("=====> move to next sibling page");
1691 		if (btree_sibling(cursor, 1) != BT_SUCCESS) {
1692 			cursor->eof = 1;
1693 			return BT_FAIL;
1694 		}
1695 		top = CURSOR_TOP(cursor);
1696 		mp = top->mpage;
1697 		DPRINTF("next page is %u, key index %u", mp->pgno, top->ki);
1698 	} else
1699 		top->ki++;
1700 
1701 	DPRINTF("==> cursor points to page %u with %lu keys, key index %u",
1702 	    mp->pgno, NUMKEYS(mp), top->ki);
1703 
1704 	assert(IS_LEAF(mp));
1705 	leaf = NODEPTR(mp, top->ki);
1706 
1707 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1708 		return BT_FAIL;
1709 
1710 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1711 		return BT_FAIL;
1712 
1713 	return BT_SUCCESS;
1714 }
1715 
1716 static int
1717 btree_cursor_set(struct cursor *cursor, struct btval *key, struct btval *data,
1718     int *exactp)
1719 {
1720 	int		 rc;
1721 	struct node	*leaf;
1722 	struct mpage	*mp;
1723 	struct ppage	*top;
1724 
1725 	assert(cursor);
1726 	assert(key);
1727 	assert(key->size > 0);
1728 
1729 	rc = btree_search_page(cursor->bt, cursor->txn, key, cursor, 0, &mp);
1730 	if (rc != BT_SUCCESS)
1731 		return rc;
1732 	assert(IS_LEAF(mp));
1733 
1734 	top = CURSOR_TOP(cursor);
1735 	leaf = btree_search_node(cursor->bt, mp, key, exactp, &top->ki);
1736 	if (exactp != NULL && !*exactp) {
1737 		/* BT_CURSOR_EXACT specified and not an exact match. */
1738 		errno = ENOENT;
1739 		return BT_FAIL;
1740 	}
1741 
1742 	if (leaf == NULL) {
1743 		DPRINTF("===> inexact leaf not found, goto sibling");
1744 		if (btree_sibling(cursor, 1) != BT_SUCCESS)
1745 			return BT_FAIL;		/* no entries matched */
1746 		top = CURSOR_TOP(cursor);
1747 		top->ki = 0;
1748 		mp = top->mpage;
1749 		assert(IS_LEAF(mp));
1750 		leaf = NODEPTR(mp, 0);
1751 	}
1752 
1753 	cursor->initialized = 1;
1754 	cursor->eof = 0;
1755 
1756 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1757 		return BT_FAIL;
1758 
1759 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1760 		return BT_FAIL;
1761 	DPRINTF("==> cursor placed on key %.*s",
1762 	    (int)key->size, (char *)key->data);
1763 
1764 	return BT_SUCCESS;
1765 }
1766 
1767 static int
1768 btree_cursor_first(struct cursor *cursor, struct btval *key, struct btval *data)
1769 {
1770 	int		 rc;
1771 	struct mpage	*mp;
1772 	struct node	*leaf;
1773 
1774 	rc = btree_search_page(cursor->bt, cursor->txn, NULL, cursor, 0, &mp);
1775 	if (rc != BT_SUCCESS)
1776 		return rc;
1777 	assert(IS_LEAF(mp));
1778 
1779 	leaf = NODEPTR(mp, 0);
1780 	cursor->initialized = 1;
1781 	cursor->eof = 0;
1782 
1783 	if (data && btree_read_data(cursor->bt, mp, leaf, data) != BT_SUCCESS)
1784 		return BT_FAIL;
1785 
1786 	if (bt_set_key(cursor->bt, mp, leaf, key) != 0)
1787 		return BT_FAIL;
1788 
1789 	return BT_SUCCESS;
1790 }
1791 
1792 int
1793 btree_cursor_get(struct cursor *cursor, struct btval *key, struct btval *data,
1794     enum cursor_op op)
1795 {
1796 	int		 rc;
1797 	int		 exact = 0;
1798 
1799 	assert(cursor);
1800 
1801 	switch (op) {
1802 	case BT_CURSOR:
1803 	case BT_CURSOR_EXACT:
1804 		while (CURSOR_TOP(cursor) != NULL)
1805 			cursor_pop_page(cursor);
1806 		if (key == NULL || key->size == 0 || key->size > MAXKEYSIZE) {
1807 			errno = EINVAL;
1808 			rc = BT_FAIL;
1809 		} else if (op == BT_CURSOR_EXACT)
1810 			rc = btree_cursor_set(cursor, key, data, &exact);
1811 		else
1812 			rc = btree_cursor_set(cursor, key, data, NULL);
1813 		break;
1814 	case BT_NEXT:
1815 		if (!cursor->initialized)
1816 			rc = btree_cursor_first(cursor, key, data);
1817 		else
1818 			rc = btree_cursor_next(cursor, key, data);
1819 		break;
1820 	case BT_FIRST:
1821 		while (CURSOR_TOP(cursor) != NULL)
1822 			cursor_pop_page(cursor);
1823 		rc = btree_cursor_first(cursor, key, data);
1824 		break;
1825 	default:
1826 		DPRINTF("unhandled/unimplemented cursor operation %u", op);
1827 		rc = BT_FAIL;
1828 		break;
1829 	}
1830 
1831 	mpage_prune(cursor->bt);
1832 
1833 	return rc;
1834 }
1835 
1836 static struct mpage *
1837 btree_new_page(struct btree *bt, uint32_t flags)
1838 {
1839 	struct mpage	*mp;
1840 
1841 	assert(bt != NULL);
1842 	assert(bt->txn != NULL);
1843 
1844 	DPRINTF("allocating new mpage %u, page size %u",
1845 	    bt->txn->next_pgno, bt->head.psize);
1846 	if ((mp = calloc(1, sizeof(*mp))) == NULL)
1847 		return NULL;
1848 	if ((mp->page = malloc(bt->head.psize)) == NULL) {
1849 		free(mp);
1850 		return NULL;
1851 	}
1852 	mp->pgno = mp->page->pgno = bt->txn->next_pgno++;
1853 	mp->page->flags = flags;
1854 	mp->page->lower = PAGEHDRSZ;
1855 	mp->page->upper = bt->head.psize;
1856 
1857 	if (IS_BRANCH(mp))
1858 		bt->meta.branch_pages++;
1859 	else if (IS_LEAF(mp))
1860 		bt->meta.leaf_pages++;
1861 	else if (IS_OVERFLOW(mp))
1862 		bt->meta.overflow_pages++;
1863 
1864 	mpage_add(bt, mp);
1865 	mpage_dirty(bt, mp);
1866 
1867 	return mp;
1868 }
1869 
1870 static size_t
1871 bt_leaf_size(struct btree *bt, struct btval *key, struct btval *data)
1872 {
1873 	size_t		 sz;
1874 
1875 	sz = LEAFSIZE(key, data);
1876 	if (data->size >= bt->head.psize / BT_MINKEYS) {
1877 		/* put on overflow page */
1878 		sz -= data->size - sizeof(pgno_t);
1879 	}
1880 
1881 	return sz + sizeof(indx_t);
1882 }
1883 
1884 static size_t
1885 bt_branch_size(struct btree *bt, struct btval *key)
1886 {
1887 	size_t		 sz;
1888 
1889 	sz = INDXSIZE(key);
1890 	if (sz >= bt->head.psize / BT_MINKEYS) {
1891 		/* put on overflow page */
1892 		/* not implemented */
1893 		/* sz -= key->size - sizeof(pgno_t); */
1894 	}
1895 
1896 	return sz + sizeof(indx_t);
1897 }
1898 
1899 static int
1900 btree_write_overflow_data(struct btree *bt, struct page *p, struct btval *data)
1901 {
1902 	size_t		 done = 0;
1903 	size_t		 sz;
1904 	size_t		 max;
1905 	pgno_t		*linkp;			/* linked page stored here */
1906 	struct mpage	*next = NULL;
1907 
1908 	max = bt->head.psize - PAGEHDRSZ;
1909 
1910 	while (done < data->size) {
1911 		if (next != NULL)
1912 			p = next->page;
1913 		linkp = &p->p_next_pgno;
1914 		if (data->size - done > max) {
1915 			/* need another overflow page */
1916 			if ((next = btree_new_page(bt, P_OVERFLOW)) == NULL)
1917 				return BT_FAIL;
1918 			*linkp = next->pgno;
1919 			DPRINTF("linking overflow page %u", next->pgno);
1920 		} else
1921 			*linkp = 0;		/* indicates end of list */
1922 		sz = data->size - done;
1923 		if (sz > max)
1924 			sz = max;
1925 		DPRINTF("copying %zu bytes to overflow page %u", sz, p->pgno);
1926 		bcopy((char *)data->data + done, p->ptrs, sz);
1927 		done += sz;
1928 	}
1929 
1930 	return BT_SUCCESS;
1931 }
1932 
1933 /* Key prefix should already be stripped.
1934  */
1935 static int
1936 btree_add_node(struct btree *bt, struct mpage *mp, indx_t indx,
1937     struct btval *key, struct btval *data, pgno_t pgno, uint8_t flags)
1938 {
1939 	unsigned int	 i;
1940 	size_t		 node_size = NODESIZE;
1941 	indx_t		 ofs;
1942 	struct node	*node;
1943 	struct page	*p;
1944 	struct mpage	*ofp = NULL;		/* overflow page */
1945 
1946 	p = mp->page;
1947 	assert(p->upper >= p->lower);
1948 
1949 	DPRINTF("add node [%.*s] to %s page %u at index %d, key size %zu",
1950 	    key ? (int)key->size : 0, key ? (char *)key->data : NULL,
1951 	    IS_LEAF(mp) ? "leaf" : "branch",
1952 	    mp->pgno, indx, key ? key->size : 0);
1953 
1954 	if (key != NULL)
1955 		node_size += key->size;
1956 
1957 	if (IS_LEAF(mp)) {
1958 		assert(data);
1959 		node_size += data->size;
1960 		if (F_ISSET(flags, F_BIGDATA)) {
1961 			/* Data already on overflow page. */
1962 			node_size -= data->size - sizeof(pgno_t);
1963 		} else if (data->size >= bt->head.psize / BT_MINKEYS) {
1964 			/* Put data on overflow page. */
1965 			DPRINTF("data size is %zu, put on overflow page",
1966 			    data->size);
1967 			node_size -= data->size - sizeof(pgno_t);
1968 			if ((ofp = btree_new_page(bt, P_OVERFLOW)) == NULL)
1969 				return BT_FAIL;
1970 			DPRINTF("allocated overflow page %u", ofp->pgno);
1971 			flags |= F_BIGDATA;
1972 		}
1973 	}
1974 
1975 	if (node_size + sizeof(indx_t) > SIZELEFT(mp)) {
1976 		DPRINTF("not enough room in page %u, got %lu ptrs",
1977 		    mp->pgno, NUMKEYS(mp));
1978 		DPRINTF("upper - lower = %u - %u = %u", p->upper, p->lower,
1979 		    p->upper - p->lower);
1980 		DPRINTF("node size = %zu", node_size);
1981 		return BT_FAIL;
1982 	}
1983 
1984 	/* Move higher pointers up one slot. */
1985 	for (i = NUMKEYS(mp); i > indx; i--)
1986 		p->ptrs[i] = p->ptrs[i - 1];
1987 
1988 	/* Adjust free space offsets. */
1989 	ofs = p->upper - node_size;
1990 	assert(ofs >= p->lower + sizeof(indx_t));
1991 	p->ptrs[indx] = ofs;
1992 	p->upper = ofs;
1993 	p->lower += sizeof(indx_t);
1994 
1995 	/* Write the node data. */
1996 	node = NODEPTR(mp, indx);
1997 	node->ksize = (key == NULL) ? 0 : key->size;
1998 	node->flags = flags;
1999 	if (IS_LEAF(mp))
2000 		node->n_dsize = data->size;
2001 	else
2002 		node->n_pgno = pgno;
2003 
2004 	if (key)
2005 		bcopy(key->data, NODEKEY(node), key->size);
2006 
2007 	if (IS_LEAF(mp)) {
2008 		assert(key);
2009 		if (ofp == NULL) {
2010 			if (F_ISSET(flags, F_BIGDATA))
2011 				bcopy(data->data, node->data + key->size,
2012 				    sizeof(pgno_t));
2013 			else
2014 				bcopy(data->data, node->data + key->size,
2015 				    data->size);
2016 		} else {
2017 			bcopy(&ofp->pgno, node->data + key->size,
2018 			    sizeof(pgno_t));
2019 			if (btree_write_overflow_data(bt, ofp->page,
2020 			    data) == BT_FAIL)
2021 				return BT_FAIL;
2022 		}
2023 	}
2024 
2025 	return BT_SUCCESS;
2026 }
2027 
2028 static void
2029 btree_del_node(struct btree *bt, struct mpage *mp, indx_t indx)
2030 {
2031 	unsigned int	 sz;
2032 	indx_t		 i, j, numkeys, ptr;
2033 	struct node	*node;
2034 	char		*base;
2035 
2036 	DPRINTF("delete node %u on %s page %u", indx,
2037 	    IS_LEAF(mp) ? "leaf" : "branch", mp->pgno);
2038 	assert(indx < NUMKEYS(mp));
2039 
2040 	node = NODEPTR(mp, indx);
2041 	sz = NODESIZE + node->ksize;
2042 	if (IS_LEAF(mp)) {
2043 		if (F_ISSET(node->flags, F_BIGDATA))
2044 			sz += sizeof(pgno_t);
2045 		else
2046 			sz += NODEDSZ(node);
2047 	}
2048 
2049 	ptr = mp->page->ptrs[indx];
2050 	numkeys = NUMKEYS(mp);
2051 	for (i = j = 0; i < numkeys; i++) {
2052 		if (i != indx) {
2053 			mp->page->ptrs[j] = mp->page->ptrs[i];
2054 			if (mp->page->ptrs[i] < ptr)
2055 				mp->page->ptrs[j] += sz;
2056 			j++;
2057 		}
2058 	}
2059 
2060 	base = (char *)mp->page + mp->page->upper;
2061 	bcopy(base, base + sz, ptr - mp->page->upper);
2062 
2063 	mp->page->lower -= sizeof(indx_t);
2064 	mp->page->upper += sz;
2065 }
2066 
2067 struct cursor *
2068 btree_txn_cursor_open(struct btree *bt, struct btree_txn *txn)
2069 {
2070 	struct cursor	*cursor;
2071 
2072 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2073 		errno = EINVAL;
2074 		return NULL;
2075 	}
2076 
2077 	if (bt == NULL) {
2078 		if (txn == NULL) {
2079 			errno = EINVAL;
2080 			return NULL;
2081 		}
2082 		bt = txn->bt;
2083 	}
2084 
2085 	if ((cursor = calloc(1, sizeof(*cursor))) != NULL) {
2086 		SLIST_INIT(&cursor->stack);
2087 		cursor->bt = bt;
2088 		cursor->txn = txn;
2089 		btree_ref(bt);
2090 	}
2091 
2092 	return cursor;
2093 }
2094 
2095 void
2096 btree_cursor_close(struct cursor *cursor)
2097 {
2098 	if (cursor != NULL) {
2099 		while (!CURSOR_EMPTY(cursor))
2100 			cursor_pop_page(cursor);
2101 
2102 		btree_close(cursor->bt);
2103 		free(cursor);
2104 	}
2105 }
2106 
2107 static int
2108 btree_update_key(struct btree *bt, struct mpage *mp, indx_t indx,
2109     struct btval *key)
2110 {
2111 	indx_t			 ptr, i, numkeys;
2112 	int			 delta;
2113 	size_t			 len;
2114 	struct node		*node;
2115 	char			*base;
2116 
2117 	node = NODEPTR(mp, indx);
2118 	ptr = mp->page->ptrs[indx];
2119 	DPRINTF("update key %u (ofs %u) [%.*s] to [%.*s] on page %u",
2120 	    indx, ptr,
2121 	    (int)node->ksize, (char *)NODEKEY(node),
2122 	    (int)key->size, (char *)key->data,
2123 	    mp->pgno);
2124 
2125 	if (key->size != node->ksize) {
2126 		delta = key->size - node->ksize;
2127 		if (delta > 0 && SIZELEFT(mp) < delta) {
2128 			DPRINTF("OUCH! Not enough room, delta = %d", delta);
2129 			return BT_FAIL;
2130 		}
2131 
2132 		numkeys = NUMKEYS(mp);
2133 		for (i = 0; i < numkeys; i++) {
2134 			if (mp->page->ptrs[i] <= ptr)
2135 				mp->page->ptrs[i] -= delta;
2136 		}
2137 
2138 		base = (char *)mp->page + mp->page->upper;
2139 		len = ptr - mp->page->upper + NODESIZE;
2140 		bcopy(base, base - delta, len);
2141 		mp->page->upper -= delta;
2142 
2143 		node = NODEPTR(mp, indx);
2144 		node->ksize = key->size;
2145 	}
2146 
2147 	bcopy(key->data, NODEKEY(node), key->size);
2148 
2149 	return BT_SUCCESS;
2150 }
2151 
2152 static int
2153 btree_adjust_prefix(struct btree *bt, struct mpage *src, int delta)
2154 {
2155 	indx_t		 i;
2156 	struct node	*node;
2157 	struct btkey	 tmpkey;
2158 	struct btval	 key;
2159 
2160 	DPRINTF("adjusting prefix lengths on page %u with delta %d",
2161 	    src->pgno, delta);
2162 	assert(delta != 0);
2163 
2164 	for (i = 0; i < NUMKEYS(src); i++) {
2165 		node = NODEPTR(src, i);
2166 		tmpkey.len = node->ksize - delta;
2167 		if (delta > 0) {
2168 			if (F_ISSET(bt->flags, BT_REVERSEKEY))
2169 				bcopy(NODEKEY(node), tmpkey.str, tmpkey.len);
2170 			else
2171 				bcopy((char *)NODEKEY(node) + delta, tmpkey.str,
2172 				    tmpkey.len);
2173 		} else {
2174 			if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2175 				bcopy(NODEKEY(node), tmpkey.str, node->ksize);
2176 				bcopy(src->prefix.str, tmpkey.str + node->ksize,
2177 				    -delta);
2178 			} else {
2179 				bcopy(src->prefix.str + src->prefix.len + delta,
2180 				    tmpkey.str, -delta);
2181 				bcopy(NODEKEY(node), tmpkey.str - delta,
2182 				    node->ksize);
2183 			}
2184 		}
2185 		key.size = tmpkey.len;
2186 		key.data = tmpkey.str;
2187 		if (btree_update_key(bt, src, i, &key) != BT_SUCCESS)
2188 			return BT_FAIL;
2189 	}
2190 
2191 	return BT_SUCCESS;
2192 }
2193 
2194 /* Move a node from src to dst.
2195  */
2196 static int
2197 btree_move_node(struct btree *bt, struct mpage *src, indx_t srcindx,
2198     struct mpage *dst, indx_t dstindx)
2199 {
2200 	int			 rc;
2201 	unsigned int		 pfxlen, mp_pfxlen = 0;
2202 	struct node		*srcnode;
2203 	struct mpage		*mp = NULL;
2204 	struct btkey		 tmpkey, srckey;
2205 	struct btval		 key, data;
2206 
2207 	assert(src->parent);
2208 	assert(dst->parent);
2209 
2210 	srcnode = NODEPTR(src, srcindx);
2211 	DPRINTF("moving %s node %u [%.*s] on page %u to node %u on page %u",
2212 	    IS_LEAF(src) ? "leaf" : "branch",
2213 	    srcindx,
2214 	    (int)srcnode->ksize, (char *)NODEKEY(srcnode),
2215 	    src->pgno,
2216 	    dstindx, dst->pgno);
2217 
2218 	find_common_prefix(bt, src);
2219 
2220 	if (IS_BRANCH(src)) {
2221 		/* Need to check if the page the moved node points to
2222 		 * changes prefix.
2223 		 */
2224 		if ((mp = btree_get_mpage(bt, NODEPGNO(srcnode))) == NULL)
2225 			return BT_FAIL;
2226 		mp->parent = src;
2227 		mp->parent_index = srcindx;
2228 		find_common_prefix(bt, mp);
2229 		mp_pfxlen = mp->prefix.len;
2230 	}
2231 
2232 	/* Mark src and dst as dirty. */
2233 	if ((src = mpage_touch(bt, src)) == NULL ||
2234 	    (dst = mpage_touch(bt, dst)) == NULL)
2235 		return BT_FAIL;
2236 
2237 	find_common_prefix(bt, dst);
2238 
2239 	/* Check if src node has destination page prefix. Otherwise the
2240 	 * destination page must expand its prefix on all its nodes.
2241 	 */
2242 	srckey.len = srcnode->ksize;
2243 	bcopy(NODEKEY(srcnode), srckey.str, srckey.len);
2244 	common_prefix(bt, &srckey, &dst->prefix, &tmpkey);
2245 	if (tmpkey.len != dst->prefix.len) {
2246 		if (btree_adjust_prefix(bt, dst,
2247 		    tmpkey.len - dst->prefix.len) != BT_SUCCESS)
2248 			return BT_FAIL;
2249 		bcopy(&tmpkey, &dst->prefix, sizeof(tmpkey));
2250 	}
2251 
2252 	if (srcindx == 0 && IS_BRANCH(src)) {
2253 		struct mpage	*low;
2254 
2255 		/* must find the lowest key below src
2256 		 */
2257 		assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2258 		    &low) == BT_SUCCESS);
2259 		expand_prefix(bt, low, 0, &srckey);
2260 		DPRINTF("found lowest key [%.*s] on leaf page %u",
2261 		    (int)srckey.len, srckey.str, low->pgno);
2262 	} else {
2263 		srckey.len = srcnode->ksize;
2264 		bcopy(NODEKEY(srcnode), srckey.str, srcnode->ksize);
2265 	}
2266 	find_common_prefix(bt, src);
2267 
2268 	/* expand the prefix */
2269 	tmpkey.len = sizeof(tmpkey.str);
2270 	concat_prefix(bt, src->prefix.str, src->prefix.len,
2271 	    srckey.str, srckey.len, tmpkey.str, &tmpkey.len);
2272 
2273 	/* Add the node to the destination page. Adjust prefix for
2274 	 * destination page.
2275 	 */
2276 	key.size = tmpkey.len;
2277 	key.data = tmpkey.str;
2278 	remove_prefix(bt, &key, dst->prefix.len);
2279 	data.size = NODEDSZ(srcnode);
2280 	data.data = NODEDATA(srcnode);
2281 	rc = btree_add_node(bt, dst, dstindx, &key, &data, NODEPGNO(srcnode),
2282 	    srcnode->flags);
2283 	if (rc != BT_SUCCESS)
2284 		return rc;
2285 
2286 	/* Delete the node from the source page.
2287 	 */
2288 	btree_del_node(bt, src, srcindx);
2289 
2290 	/* Update the parent separators.
2291 	 */
2292 	if (srcindx == 0 && src->parent_index != 0) {
2293 		expand_prefix(bt, src, 0, &tmpkey);
2294 		key.size = tmpkey.len;
2295 		key.data = tmpkey.str;
2296 		remove_prefix(bt, &key, src->parent->prefix.len);
2297 
2298 		DPRINTF("update separator for source page %u to [%.*s]",
2299 		    src->pgno, (int)key.size, (char *)key.data);
2300 		if (btree_update_key(bt, src->parent, src->parent_index,
2301 		    &key) != BT_SUCCESS)
2302 			return BT_FAIL;
2303 	}
2304 
2305 	if (srcindx == 0 && IS_BRANCH(src)) {
2306 		struct btval	 nullkey;
2307 		nullkey.size = 0;
2308 		assert(btree_update_key(bt, src, 0, &nullkey) == BT_SUCCESS);
2309 	}
2310 
2311 	if (dstindx == 0 && dst->parent_index != 0) {
2312 		expand_prefix(bt, dst, 0, &tmpkey);
2313 		key.size = tmpkey.len;
2314 		key.data = tmpkey.str;
2315 		remove_prefix(bt, &key, dst->parent->prefix.len);
2316 
2317 		DPRINTF("update separator for destination page %u to [%.*s]",
2318 		    dst->pgno, (int)key.size, (char *)key.data);
2319 		if (btree_update_key(bt, dst->parent, dst->parent_index,
2320 		    &key) != BT_SUCCESS)
2321 			return BT_FAIL;
2322 	}
2323 
2324 	if (dstindx == 0 && IS_BRANCH(dst)) {
2325 		struct btval	 nullkey;
2326 		nullkey.size = 0;
2327 		assert(btree_update_key(bt, dst, 0, &nullkey) == BT_SUCCESS);
2328 	}
2329 
2330 	/* We can get a new page prefix here!
2331 	 * Must update keys in all nodes of this page!
2332 	 */
2333 	pfxlen = src->prefix.len;
2334 	find_common_prefix(bt, src);
2335 	if (src->prefix.len != pfxlen) {
2336 		if (btree_adjust_prefix(bt, src,
2337 		    src->prefix.len - pfxlen) != BT_SUCCESS)
2338 			return BT_FAIL;
2339 	}
2340 
2341 	pfxlen = dst->prefix.len;
2342 	find_common_prefix(bt, dst);
2343 	if (dst->prefix.len != pfxlen) {
2344 		if (btree_adjust_prefix(bt, dst,
2345 		    dst->prefix.len - pfxlen) != BT_SUCCESS)
2346 			return BT_FAIL;
2347 	}
2348 
2349 	if (IS_BRANCH(dst)) {
2350 		assert(mp);
2351 		mp->parent = dst;
2352 		mp->parent_index = dstindx;
2353 		find_common_prefix(bt, mp);
2354 		if (mp->prefix.len != mp_pfxlen) {
2355 			DPRINTF("moved branch node has changed prefix");
2356 			if ((mp = mpage_touch(bt, mp)) == NULL)
2357 				return BT_FAIL;
2358 			if (btree_adjust_prefix(bt, mp,
2359 			    mp->prefix.len - mp_pfxlen) != BT_SUCCESS)
2360 				return BT_FAIL;
2361 		}
2362 	}
2363 
2364 	return BT_SUCCESS;
2365 }
2366 
2367 static int
2368 btree_merge(struct btree *bt, struct mpage *src, struct mpage *dst)
2369 {
2370 	int			 rc;
2371 	indx_t			 i;
2372 	unsigned int		 pfxlen;
2373 	struct node		*srcnode;
2374 	struct btkey		 tmpkey, dstpfx;
2375 	struct btval		 key, data;
2376 
2377 	DPRINTF("merging page %u and %u", src->pgno, dst->pgno);
2378 
2379 	assert(src->parent);	/* can't merge root page */
2380 	assert(dst->parent);
2381 	assert(bt->txn != NULL);
2382 
2383 	/* Mark src and dst as dirty. */
2384 	if ((src = mpage_touch(bt, src)) == NULL ||
2385 	    (dst = mpage_touch(bt, dst)) == NULL)
2386 		return BT_FAIL;
2387 
2388 	find_common_prefix(bt, src);
2389 	find_common_prefix(bt, dst);
2390 
2391 	/* Check if source nodes has destination page prefix. Otherwise
2392 	 * the destination page must expand its prefix on all its nodes.
2393 	 */
2394 	common_prefix(bt, &src->prefix, &dst->prefix, &dstpfx);
2395 	if (dstpfx.len != dst->prefix.len) {
2396 		if (btree_adjust_prefix(bt, dst,
2397 		    dstpfx.len - dst->prefix.len) != BT_SUCCESS)
2398 			return BT_FAIL;
2399 		bcopy(&dstpfx, &dst->prefix, sizeof(dstpfx));
2400 	}
2401 
2402 	/* Move all nodes from src to dst.
2403 	 */
2404 	for (i = 0; i < NUMKEYS(src); i++) {
2405 		srcnode = NODEPTR(src, i);
2406 
2407 		/* If branch node 0 (implicit key), find the real key.
2408 		 */
2409 		if (i == 0 && IS_BRANCH(src)) {
2410 			struct mpage	*low;
2411 
2412 			/* must find the lowest key below src
2413 			 */
2414 			assert(btree_search_page_root(bt, src, NULL, NULL, 0,
2415 			    &low) == BT_SUCCESS);
2416 			expand_prefix(bt, low, 0, &tmpkey);
2417 			DPRINTF("found lowest key [%.*s] on leaf page %u",
2418 			    (int)tmpkey.len, tmpkey.str, low->pgno);
2419 		} else {
2420 			expand_prefix(bt, src, i, &tmpkey);
2421 		}
2422 
2423 		key.size = tmpkey.len;
2424 		key.data = tmpkey.str;
2425 
2426 		remove_prefix(bt, &key, dst->prefix.len);
2427 		data.size = NODEDSZ(srcnode);
2428 		data.data = NODEDATA(srcnode);
2429 		rc = btree_add_node(bt, dst, NUMKEYS(dst), &key,
2430 		    &data, NODEPGNO(srcnode), srcnode->flags);
2431 		if (rc != BT_SUCCESS)
2432 			return rc;
2433 	}
2434 
2435 	DPRINTF("dst page %u now has %lu keys (%.1f%% filled)",
2436 	    dst->pgno, NUMKEYS(dst), (float)PAGEFILL(bt, dst) / 10);
2437 
2438 	/* Unlink the src page from parent.
2439 	 */
2440 	btree_del_node(bt, src->parent, src->parent_index);
2441 	if (src->parent_index == 0) {
2442 		key.size = 0;
2443 		if (btree_update_key(bt, src->parent, 0, &key) != BT_SUCCESS)
2444 			return BT_FAIL;
2445 
2446 		pfxlen = src->prefix.len;
2447 		find_common_prefix(bt, src);
2448 		assert (src->prefix.len == pfxlen);
2449 	}
2450 
2451 	if (IS_LEAF(src))
2452 		bt->meta.leaf_pages--;
2453 	else
2454 		bt->meta.branch_pages--;
2455 
2456 	return btree_rebalance(bt, src->parent);
2457 }
2458 
2459 #define FILL_THRESHOLD	 250
2460 
2461 static int
2462 btree_rebalance(struct btree *bt, struct mpage *mp)
2463 {
2464 	struct node	*node;
2465 	struct mpage	*parent;
2466 	struct mpage	*root;
2467 	struct mpage	*neighbor = NULL;
2468 	indx_t		 si = 0, di = 0;
2469 
2470 	assert(bt != NULL);
2471 	assert(bt->txn != NULL);
2472 	assert(mp != NULL);
2473 
2474 	DPRINTF("rebalancing %s page %u (has %lu keys, %.1f%% full)",
2475 	    IS_LEAF(mp) ? "leaf" : "branch",
2476 	    mp->pgno, NUMKEYS(mp), (float)PAGEFILL(bt, mp) / 10);
2477 
2478 	if (PAGEFILL(bt, mp) >= FILL_THRESHOLD) {
2479 		DPRINTF("no need to rebalance page %u, above fill threshold",
2480 		    mp->pgno);
2481 		return BT_SUCCESS;
2482 	}
2483 
2484 	parent = mp->parent;
2485 
2486 	if (parent == NULL) {
2487 		if (NUMKEYS(mp) == 0) {
2488 			DPRINTF("tree is completely empty");
2489 			bt->txn->root = P_INVALID;
2490 			bt->meta.depth--;
2491 			bt->meta.leaf_pages--;
2492 		} else if (IS_BRANCH(mp) && NUMKEYS(mp) == 1) {
2493 			DPRINTF("collapsing root page!");
2494 			bt->txn->root = NODEPGNO(NODEPTR(mp, 0));
2495 			if ((root = btree_get_mpage(bt, bt->txn->root)) == NULL)
2496 				return BT_FAIL;
2497 			root->parent = NULL;
2498 			bt->meta.depth--;
2499 			bt->meta.branch_pages--;
2500 		} else
2501 			DPRINTF("root page doesn't need rebalancing");
2502 		return BT_SUCCESS;
2503 	}
2504 
2505 	/* The parent (branch page) must have at least 2 pointers,
2506 	 * otherwise the tree is invalid.
2507 	 */
2508 	assert(NUMKEYS(parent) > 1);
2509 
2510 	/* Leaf page fill factor is below the threshold.
2511 	 * Try to move keys from left or right neighbor, or
2512 	 * merge with a neighbor page.
2513 	 */
2514 
2515 	/* Find neighbors.
2516 	 */
2517 	if (mp->parent_index == 0) {
2518 		/* We're the leftmost leaf in our parent.
2519 		 */
2520 		DPRINTF("reading right neighbor");
2521 		node = NODEPTR(parent, mp->parent_index + 1);
2522 		if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2523 			return BT_FAIL;
2524 		neighbor->parent_index = mp->parent_index + 1;
2525 		si = 0;
2526 		di = NUMKEYS(mp);
2527 	} else {
2528 		/* There is at least one neighbor to the left.
2529 		 */
2530 		DPRINTF("reading left neighbor");
2531 		node = NODEPTR(parent, mp->parent_index - 1);
2532 		if ((neighbor = btree_get_mpage(bt, NODEPGNO(node))) == NULL)
2533 			return BT_FAIL;
2534 		neighbor->parent_index = mp->parent_index - 1;
2535 		si = NUMKEYS(neighbor) - 1;
2536 		di = 0;
2537 	}
2538 	neighbor->parent = parent;
2539 
2540 	DPRINTF("found neighbor page %u (%lu keys, %.1f%% full)",
2541 	    neighbor->pgno, NUMKEYS(neighbor), (float)PAGEFILL(bt, neighbor) / 10);
2542 
2543 	/* If the neighbor page is above threshold and has at least two
2544 	 * keys, move one key from it.
2545 	 *
2546 	 * Otherwise we should try to merge them, but that might not be
2547 	 * possible, even if both are below threshold, as prefix expansion
2548 	 * might make keys larger. FIXME: detect this
2549 	 */
2550 	if (PAGEFILL(bt, neighbor) >= FILL_THRESHOLD && NUMKEYS(neighbor) >= 2)
2551 		return btree_move_node(bt, neighbor, si, mp, di);
2552 	else { /* FIXME: if (has_enough_room()) */
2553 		if (mp->parent_index == 0)
2554 			return btree_merge(bt, neighbor, mp);
2555 		else
2556 			return btree_merge(bt, mp, neighbor);
2557 	}
2558 }
2559 
2560 int
2561 btree_txn_del(struct btree *bt, struct btree_txn *txn,
2562     struct btval *key, struct btval *data)
2563 {
2564 	int		 rc, exact, close_txn = 0;
2565 	unsigned int	 ki;
2566 	struct node	*leaf;
2567 	struct mpage	*mp;
2568 
2569 	DPRINTF("========> delete key %.*s", (int)key->size, (char *)key->data);
2570 
2571 	assert(key != NULL);
2572 
2573 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2574 		errno = EINVAL;
2575 		return BT_FAIL;
2576 	}
2577 
2578 	if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2579 		errno = EINVAL;
2580 		return BT_FAIL;
2581 	}
2582 
2583 	if (bt == NULL) {
2584 		if (txn == NULL) {
2585 			errno = EINVAL;
2586 			return BT_FAIL;
2587 		}
2588 		bt = txn->bt;
2589 	}
2590 
2591 	if (key->size == 0 || key->size > MAXKEYSIZE) {
2592 		errno = EINVAL;
2593 		return BT_FAIL;
2594 	}
2595 
2596 	if (txn == NULL) {
2597 		close_txn = 1;
2598 		if ((txn = btree_txn_begin(bt, 0)) == NULL)
2599 			return BT_FAIL;
2600 	}
2601 
2602 	if ((rc = btree_search_page(bt, txn, key, NULL, 1, &mp)) != BT_SUCCESS)
2603 		goto done;
2604 
2605 	leaf = btree_search_node(bt, mp, key, &exact, &ki);
2606 	if (leaf == NULL || !exact) {
2607 		errno = ENOENT;
2608 		rc = BT_FAIL;
2609 		goto done;
2610 	}
2611 
2612 	if (data && (rc = btree_read_data(bt, NULL, leaf, data)) != BT_SUCCESS)
2613 		goto done;
2614 
2615 	btree_del_node(bt, mp, ki);
2616 	bt->meta.entries--;
2617 	rc = btree_rebalance(bt, mp);
2618 	if (rc != BT_SUCCESS)
2619 		txn->flags |= BT_TXN_ERROR;
2620 
2621 done:
2622 	if (close_txn) {
2623 		if (rc == BT_SUCCESS)
2624 			rc = btree_txn_commit(txn);
2625 		else
2626 			btree_txn_abort(txn);
2627 	}
2628 	mpage_prune(bt);
2629 	return rc;
2630 }
2631 
2632 /* Reduce the length of the prefix separator <sep> to the minimum length that
2633  * still makes it uniquely distinguishable from <min>.
2634  *
2635  * <min> is guaranteed to be sorted less than <sep>
2636  *
2637  * On return, <sep> is modified to the minimum length.
2638  */
2639 static void
2640 bt_reduce_separator(struct btree *bt, struct node *min, struct btval *sep)
2641 {
2642 	size_t		 n = 0;
2643 	char		*p1;
2644 	char		*p2;
2645 
2646 	if (F_ISSET(bt->flags, BT_REVERSEKEY)) {
2647 
2648 		assert(sep->size > 0);
2649 
2650 		p1 = (char *)NODEKEY(min) + min->ksize - 1;
2651 		p2 = (char *)sep->data + sep->size - 1;
2652 
2653 		while (p1 >= (char *)NODEKEY(min) && *p1 == *p2) {
2654 			assert(p2 > (char *)sep->data);
2655 			p1--;
2656 			p2--;
2657 			n++;
2658 		}
2659 
2660 		sep->data = p2;
2661 		sep->size = n + 1;
2662 	} else {
2663 
2664 		assert(min->ksize > 0);
2665 		assert(sep->size > 0);
2666 
2667 		p1 = (char *)NODEKEY(min);
2668 		p2 = (char *)sep->data;
2669 
2670 		while (*p1 == *p2) {
2671 			p1++;
2672 			p2++;
2673 			n++;
2674 			if (n == min->ksize || n == sep->size)
2675 				break;
2676 		}
2677 
2678 		sep->size = n + 1;
2679 	}
2680 
2681 	DPRINTF("reduced separator to [%.*s] > [%.*s]",
2682 	    (int)sep->size, (char *)sep->data,
2683 	    (int)min->ksize, (char *)NODEKEY(min));
2684 }
2685 
2686 /* Split page <*mpp>, and insert <key,(data|newpgno)> in either left or
2687  * right sibling, at index <*newindxp> (as if unsplit). Updates *mpp and
2688  * *newindxp with the actual values after split, ie if *mpp and *newindxp
2689  * refer to a node in the new right sibling page.
2690  */
2691 static int
2692 btree_split(struct btree *bt, struct mpage **mpp, unsigned int *newindxp,
2693     struct btval *newkey, struct btval *newdata, pgno_t newpgno)
2694 {
2695 	uint8_t		 flags;
2696 	int		 rc = BT_SUCCESS, ins_new = 0;
2697 	indx_t		 newindx;
2698 	pgno_t		 pgno = 0;
2699 	size_t		 orig_pfx_len, left_pfx_diff, right_pfx_diff, pfx_diff;
2700 	unsigned int	 i, j, split_indx;
2701 	struct node	*node;
2702 	struct mpage	*pright, *p, *mp;
2703 	struct btval	 sepkey, rkey, rdata;
2704 	struct btkey	 tmpkey;
2705 	struct page	*copy;
2706 
2707 	assert(bt != NULL);
2708 	assert(bt->txn != NULL);
2709 
2710 	mp = *mpp;
2711 	newindx = *newindxp;
2712 
2713 	DPRINTF("-----> splitting %s page %u and adding [%.*s] at index %d",
2714 	    IS_LEAF(mp) ? "leaf" : "branch", mp->pgno,
2715 	    (int)newkey->size, (char *)newkey->data, *newindxp);
2716 	DPRINTF("page %u has prefix [%.*s]", mp->pgno,
2717 	    (int)mp->prefix.len, (char *)mp->prefix.str);
2718 	orig_pfx_len = mp->prefix.len;
2719 
2720 	if (mp->parent == NULL) {
2721 		if ((mp->parent = btree_new_page(bt, P_BRANCH)) == NULL)
2722 			return BT_FAIL;
2723 		mp->parent_index = 0;
2724 		bt->txn->root = mp->parent->pgno;
2725 		DPRINTF("root split! new root = %u", mp->parent->pgno);
2726 		bt->meta.depth++;
2727 
2728 		/* Add left (implicit) pointer. */
2729 		if (btree_add_node(bt, mp->parent, 0, NULL, NULL,
2730 		    mp->pgno, 0) != BT_SUCCESS)
2731 			return BT_FAIL;
2732 	} else {
2733 		DPRINTF("parent branch page is %u", mp->parent->pgno);
2734 	}
2735 
2736 	/* Create a right sibling. */
2737 	if ((pright = btree_new_page(bt, mp->page->flags)) == NULL)
2738 		return BT_FAIL;
2739 	pright->parent = mp->parent;
2740 	pright->parent_index = mp->parent_index + 1;
2741 	DPRINTF("new right sibling: page %u", pright->pgno);
2742 
2743 	/* Move half of the keys to the right sibling. */
2744 	if ((copy = malloc(bt->head.psize)) == NULL)
2745 		return BT_FAIL;
2746 	bcopy(mp->page, copy, bt->head.psize);
2747 	assert(mp->ref == 0);				/* XXX */
2748 	memset(&mp->page->ptrs, 0, bt->head.psize - PAGEHDRSZ);
2749 	mp->page->lower = PAGEHDRSZ;
2750 	mp->page->upper = bt->head.psize;
2751 
2752 	split_indx = NUMKEYSP(copy) / 2 + 1;
2753 
2754 	/* First find the separating key between the split pages.
2755 	 */
2756 	memset(&sepkey, 0, sizeof(sepkey));
2757 	if (newindx == split_indx) {
2758 		sepkey.size = newkey->size;
2759 		sepkey.data = newkey->data;
2760 		remove_prefix(bt, &sepkey, mp->prefix.len);
2761 	} else {
2762 		node = NODEPTRP(copy, split_indx);
2763 		sepkey.size = node->ksize;
2764 		sepkey.data = NODEKEY(node);
2765 	}
2766 
2767 	if (IS_LEAF(mp) && bt->cmp == NULL) {
2768 		/* Find the smallest separator. */
2769 		/* Ref: Prefix B-trees, R. Bayer, K. Unterauer, 1977 */
2770 		node = NODEPTRP(copy, split_indx - 1);
2771 		bt_reduce_separator(bt, node, &sepkey);
2772 	}
2773 
2774 	/* Fix separator wrt parent prefix. */
2775 	if (bt->cmp == NULL) {
2776 		tmpkey.len = sizeof(tmpkey.str);
2777 		concat_prefix(bt, mp->prefix.str, mp->prefix.len,
2778 		    sepkey.data, sepkey.size, tmpkey.str, &tmpkey.len);
2779 		sepkey.data = tmpkey.str;
2780 		sepkey.size = tmpkey.len;
2781 	}
2782 
2783 	DPRINTF("separator is [%.*s]", (int)sepkey.size, (char *)sepkey.data);
2784 
2785 	/* Copy separator key to the parent.
2786 	 */
2787 	if (SIZELEFT(pright->parent) < bt_branch_size(bt, &sepkey)) {
2788 		rc = btree_split(bt, &pright->parent, &pright->parent_index,
2789 		    &sepkey, NULL, pright->pgno);
2790 
2791 		/* Right page might now have changed parent.
2792 		 * Check if left page also changed parent.
2793 		 */
2794 		if (pright->parent != mp->parent &&
2795 		    mp->parent_index >= NUMKEYS(mp->parent)) {
2796 			mp->parent = pright->parent;
2797 			mp->parent_index = pright->parent_index - 1;
2798 		}
2799 	} else {
2800 		remove_prefix(bt, &sepkey, pright->parent->prefix.len);
2801 		rc = btree_add_node(bt, pright->parent, pright->parent_index,
2802 		    &sepkey, NULL, pright->pgno, 0);
2803 	}
2804 	if (rc != BT_SUCCESS) {
2805 		free(copy);
2806 		return BT_FAIL;
2807 	}
2808 
2809 	/* Update prefix for right and left page, if the parent was split.
2810 	 */
2811 	find_common_prefix(bt, pright);
2812 	assert(orig_pfx_len <= pright->prefix.len);
2813 	right_pfx_diff = pright->prefix.len - orig_pfx_len;
2814 
2815 	find_common_prefix(bt, mp);
2816 	assert(orig_pfx_len <= mp->prefix.len);
2817 	left_pfx_diff = mp->prefix.len - orig_pfx_len;
2818 
2819 	for (i = j = 0; i <= NUMKEYSP(copy); j++) {
2820 		if (i < split_indx) {
2821 			/* Re-insert in left sibling. */
2822 			p = mp;
2823 			pfx_diff = left_pfx_diff;
2824 		} else {
2825 			/* Insert in right sibling. */
2826 			if (i == split_indx)
2827 				/* Reset insert index for right sibling. */
2828 				j = (i == newindx && ins_new);
2829 			p = pright;
2830 			pfx_diff = right_pfx_diff;
2831 		}
2832 
2833 		if (i == newindx && !ins_new) {
2834 			/* Insert the original entry that caused the split. */
2835 			rkey.data = newkey->data;
2836 			rkey.size = newkey->size;
2837 			if (IS_LEAF(mp)) {
2838 				rdata.data = newdata->data;
2839 				rdata.size = newdata->size;
2840 			} else
2841 				pgno = newpgno;
2842 			flags = 0;
2843 			pfx_diff = p->prefix.len;
2844 
2845 			ins_new = 1;
2846 
2847 			/* Update page and index for the new key. */
2848 			*newindxp = j;
2849 			*mpp = p;
2850 		} else if (i == NUMKEYSP(copy)) {
2851 			break;
2852 		} else {
2853 			node = NODEPTRP(copy, i);
2854 			rkey.data = NODEKEY(node);
2855 			rkey.size = node->ksize;
2856 			if (IS_LEAF(mp)) {
2857 				rdata.data = NODEDATA(node);
2858 				rdata.size = node->n_dsize;
2859 			} else
2860 				pgno = node->n_pgno;
2861 			flags = node->flags;
2862 
2863 			i++;
2864 		}
2865 
2866 		if (!IS_LEAF(mp) && j == 0) {
2867 			/* First branch index doesn't need key data. */
2868 			rkey.size = 0;
2869 		} else
2870 			remove_prefix(bt, &rkey, pfx_diff);
2871 
2872 		rc = btree_add_node(bt, p, j, &rkey, &rdata, pgno,flags);
2873 	}
2874 
2875 	free(copy);
2876 	return rc;
2877 }
2878 
2879 int
2880 btree_txn_put(struct btree *bt, struct btree_txn *txn,
2881     struct btval *key, struct btval *data, unsigned int flags)
2882 {
2883 	int		 rc = BT_SUCCESS, exact, close_txn = 0;
2884 	unsigned int	 ki;
2885 	struct node	*leaf;
2886 	struct mpage	*mp;
2887 	struct btval	 xkey;
2888 
2889 	assert(key != NULL);
2890 	assert(data != NULL);
2891 
2892 	if (bt != NULL && txn != NULL && bt != txn->bt) {
2893 		errno = EINVAL;
2894 		return BT_FAIL;
2895 	}
2896 
2897 	if (txn != NULL && F_ISSET(txn->flags, BT_TXN_RDONLY)) {
2898 		errno = EINVAL;
2899 		return BT_FAIL;
2900 	}
2901 
2902 	if (bt == NULL) {
2903 		if (txn == NULL) {
2904 			errno = EINVAL;
2905 			return BT_FAIL;
2906 		}
2907 		bt = txn->bt;
2908 	}
2909 
2910 	if (key->size == 0 || key->size > MAXKEYSIZE) {
2911 		errno = EINVAL;
2912 		return BT_FAIL;
2913 	}
2914 
2915 	DPRINTF("==> put key %.*s, size %zu, data size %zu",
2916 		(int)key->size, (char *)key->data, key->size, data->size);
2917 
2918 	if (txn == NULL) {
2919 		close_txn = 1;
2920 		if ((txn = btree_txn_begin(bt, 0)) == NULL)
2921 			return BT_FAIL;
2922 	}
2923 
2924 	rc = btree_search_page(bt, txn, key, NULL, 1, &mp);
2925 	if (rc == BT_SUCCESS) {
2926 		leaf = btree_search_node(bt, mp, key, &exact, &ki);
2927 		if (leaf && exact) {
2928 			if (F_ISSET(flags, BT_NOOVERWRITE)) {
2929 				DPRINTF("duplicate key %.*s",
2930 				    (int)key->size, (char *)key->data);
2931 				errno = EEXIST;
2932 				rc = BT_FAIL;
2933 				goto done;
2934 			}
2935 			btree_del_node(bt, mp, ki);
2936 		}
2937 		if (leaf == NULL) {		/* append if not found */
2938 			ki = NUMKEYS(mp);
2939 			DPRINTF("appending key at index %d", ki);
2940 		}
2941 	} else if (errno == ENOENT) {
2942 		/* new file, just write a root leaf page */
2943 		DPRINTF("allocating new root leaf page");
2944 		if ((mp = btree_new_page(bt, P_LEAF)) == NULL) {
2945 			rc = BT_FAIL;
2946 			goto done;
2947 		}
2948 		txn->root = mp->pgno;
2949 		bt->meta.depth++;
2950 		ki = 0;
2951 	}
2952 	else
2953 		goto done;
2954 
2955 	assert(IS_LEAF(mp));
2956 	DPRINTF("there are %lu keys, should insert new key at index %u",
2957 		NUMKEYS(mp), ki);
2958 
2959 	/* Copy the key pointer as it is modified by the prefix code. The
2960 	 * caller might have malloc'ed the data.
2961 	 */
2962 	xkey.data = key->data;
2963 	xkey.size = key->size;
2964 
2965 	if (SIZELEFT(mp) < bt_leaf_size(bt, key, data)) {
2966 		rc = btree_split(bt, &mp, &ki, &xkey, data, P_INVALID);
2967 	} else {
2968 		/* There is room already in this leaf page. */
2969 		remove_prefix(bt, &xkey, mp->prefix.len);
2970 		rc = btree_add_node(bt, mp, ki, &xkey, data, 0, 0);
2971 	}
2972 
2973 	if (rc != BT_SUCCESS)
2974 		txn->flags |= BT_TXN_ERROR;
2975 	else
2976 		bt->meta.entries++;
2977 
2978 done:
2979 	if (close_txn) {
2980 		if (rc == BT_SUCCESS)
2981 			rc = btree_txn_commit(txn);
2982 		else
2983 			btree_txn_abort(txn);
2984 	}
2985 	mpage_prune(bt);
2986 	return rc;
2987 }
2988 
2989 static pgno_t
2990 btree_compact_tree(struct btree *bt, pgno_t pgno, struct btree *btc)
2991 {
2992 	ssize_t		 rc;
2993 	indx_t		 i;
2994 	pgno_t		*pnext, next;
2995 	struct node	*node;
2996 	struct page	*p;
2997 	struct mpage	*mp;
2998 
2999 	/* Get the page and make a copy of it.
3000 	 */
3001 	if ((mp = btree_get_mpage(bt, pgno)) == NULL)
3002 		return P_INVALID;
3003 	if ((p = malloc(bt->head.psize)) == NULL)
3004 		return P_INVALID;
3005 	bcopy(mp->page, p, bt->head.psize);
3006 
3007         /* Go through all nodes in the (copied) page and update the
3008          * page pointers.
3009 	 */
3010 	if (F_ISSET(p->flags, P_BRANCH)) {
3011 		for (i = 0; i < NUMKEYSP(p); i++) {
3012 			node = NODEPTRP(p, i);
3013 			node->n_pgno = btree_compact_tree(bt, node->n_pgno, btc);
3014 			if (node->n_pgno == P_INVALID) {
3015 				free(p);
3016 				return P_INVALID;
3017 			}
3018 		}
3019 	} else if (F_ISSET(p->flags, P_LEAF)) {
3020 		for (i = 0; i < NUMKEYSP(p); i++) {
3021 			node = NODEPTRP(p, i);
3022 			if (F_ISSET(node->flags, F_BIGDATA)) {
3023 				bcopy(NODEDATA(node), &next, sizeof(next));
3024 				next = btree_compact_tree(bt, next, btc);
3025 				if (next == P_INVALID) {
3026 					free(p);
3027 					return P_INVALID;
3028 				}
3029 				bcopy(&next, NODEDATA(node), sizeof(next));
3030 			}
3031 		}
3032 	} else if (F_ISSET(p->flags, P_OVERFLOW)) {
3033 		pnext = &p->p_next_pgno;
3034 		if (*pnext > 0) {
3035 			*pnext = btree_compact_tree(bt, *pnext, btc);
3036 			if (*pnext == P_INVALID) {
3037 				free(p);
3038 				return P_INVALID;
3039 			}
3040 		}
3041 	} else
3042 		assert(0);
3043 
3044 	pgno = p->pgno = btc->txn->next_pgno++;
3045 	rc = write(btc->fd, p, bt->head.psize);
3046 	free(p);
3047 	if (rc != (ssize_t)bt->head.psize)
3048 		return P_INVALID;
3049 	mpage_prune(bt);
3050 	return pgno;
3051 }
3052 
3053 int
3054 btree_compact(struct btree *bt)
3055 {
3056 	char			*compact_path = NULL;
3057 	struct btree		*btc;
3058 	struct btree_txn	*txn, *txnc = NULL;
3059 	int			 fd;
3060 	pgno_t			 root;
3061 
3062 	assert(bt != NULL);
3063 
3064 	DPRINTF("compacting btree %p with path %s", bt, bt->path);
3065 
3066 	if (bt->path == NULL) {
3067 		errno = EINVAL;
3068 		return BT_FAIL;
3069 	}
3070 
3071 	if ((txn = btree_txn_begin(bt, 0)) == NULL)
3072 		return BT_FAIL;
3073 
3074 	if (asprintf(&compact_path, "%s.compact.XXXXXX", bt->path) == -1) {
3075 		btree_txn_abort(txn);
3076 		return BT_FAIL;
3077 	}
3078 	fd = mkstemp(compact_path);
3079 	if (fd == -1) {
3080 		free(compact_path);
3081 		btree_txn_abort(txn);
3082 		return BT_FAIL;
3083 	}
3084 
3085 	if ((btc = btree_open_fd(fd, 0)) == NULL)
3086 		goto failed;
3087 	bcopy(&bt->meta, &btc->meta, sizeof(bt->meta));
3088 	btc->meta.revisions = 0;
3089 
3090 	if ((txnc = btree_txn_begin(btc, 0)) == NULL)
3091 		goto failed;
3092 
3093 	if (bt->meta.root != P_INVALID) {
3094 		root = btree_compact_tree(bt, bt->meta.root, btc);
3095 		if (root == P_INVALID)
3096 			goto failed;
3097 		if (btree_write_meta(btc, root, 0) != BT_SUCCESS)
3098 			goto failed;
3099 	}
3100 
3101 	fsync(fd);
3102 
3103 	DPRINTF("renaming %s to %s", compact_path, bt->path);
3104 	if (rename(compact_path, bt->path) != 0)
3105 		goto failed;
3106 
3107 	/* Write a "tombstone" meta page so other processes can pick up
3108 	 * the change and re-open the file.
3109 	 */
3110 	if (btree_write_meta(bt, P_INVALID, BT_TOMBSTONE) != BT_SUCCESS)
3111 		goto failed;
3112 
3113 	btree_txn_abort(txn);
3114 	btree_txn_abort(txnc);
3115 	free(compact_path);
3116 	btree_close(btc);
3117 	mpage_prune(bt);
3118 	return 0;
3119 
3120 failed:
3121 	btree_txn_abort(txn);
3122 	btree_txn_abort(txnc);
3123 	unlink(compact_path);
3124 	free(compact_path);
3125 	btree_close(btc);
3126 	mpage_prune(bt);
3127 	return BT_FAIL;
3128 }
3129 
3130 /* Reverts the last change. Truncates the file at the last root page.
3131  */
3132 int
3133 btree_revert(struct btree *bt)
3134 {
3135 	if (btree_read_meta(bt, NULL) != 0)
3136 		return -1;
3137 
3138 	DPRINTF("truncating file at page %u", bt->meta.root);
3139 	return ftruncate(bt->fd, bt->head.psize * bt->meta.root);
3140 }
3141 
3142 void
3143 btree_set_cache_size(struct btree *bt, unsigned int cache_size)
3144 {
3145 	bt->stat.max_cache = cache_size;
3146 }
3147 
3148 unsigned int
3149 btree_get_flags(struct btree *bt)
3150 {
3151 	return (bt->flags & ~BT_FIXPADDING);
3152 }
3153 
3154 const char *
3155 btree_get_path(struct btree *bt)
3156 {
3157 	return bt->path;
3158 }
3159 
3160 const struct btree_stat *
3161 btree_stat(struct btree *bt)
3162 {
3163 	if (bt == NULL)
3164 		return NULL;
3165 
3166 	bt->stat.branch_pages = bt->meta.branch_pages;
3167 	bt->stat.leaf_pages = bt->meta.leaf_pages;
3168 	bt->stat.overflow_pages = bt->meta.overflow_pages;
3169 	bt->stat.revisions = bt->meta.revisions;
3170 	bt->stat.depth = bt->meta.depth;
3171 	bt->stat.entries = bt->meta.entries;
3172 	bt->stat.psize = bt->head.psize;
3173 	bt->stat.created_at = bt->meta.created_at;
3174 
3175 	return &bt->stat;
3176 }
3177 
3178