xref: /netbsd-src/sys/kern/vfs_wapbl.c (revision a24efa7dea9f1f56c3bdb15a927d3516792ace1c)
1 /*	$NetBSD: vfs_wapbl.c,v 1.78 2016/05/19 18:32:29 riastradh Exp $	*/
2 
3 /*-
4  * Copyright (c) 2003, 2008, 2009 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Wasabi Systems, Inc.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 /*
33  * This implements file system independent write ahead filesystem logging.
34  */
35 
36 #define WAPBL_INTERNAL
37 
38 #include <sys/cdefs.h>
39 __KERNEL_RCSID(0, "$NetBSD: vfs_wapbl.c,v 1.78 2016/05/19 18:32:29 riastradh Exp $");
40 
41 #include <sys/param.h>
42 #include <sys/bitops.h>
43 #include <sys/time.h>
44 #include <sys/wapbl.h>
45 #include <sys/wapbl_replay.h>
46 
47 #ifdef _KERNEL
48 
49 #include <sys/atomic.h>
50 #include <sys/conf.h>
51 #include <sys/file.h>
52 #include <sys/kauth.h>
53 #include <sys/kernel.h>
54 #include <sys/module.h>
55 #include <sys/mount.h>
56 #include <sys/mutex.h>
57 #include <sys/namei.h>
58 #include <sys/proc.h>
59 #include <sys/resourcevar.h>
60 #include <sys/sysctl.h>
61 #include <sys/uio.h>
62 #include <sys/vnode.h>
63 
64 #include <miscfs/specfs/specdev.h>
65 
66 #define	wapbl_alloc(s) kmem_alloc((s), KM_SLEEP)
67 #define	wapbl_free(a, s) kmem_free((a), (s))
68 #define	wapbl_calloc(n, s) kmem_zalloc((n)*(s), KM_SLEEP)
69 
70 static struct sysctllog *wapbl_sysctl;
71 static int wapbl_flush_disk_cache = 1;
72 static int wapbl_verbose_commit = 0;
73 
74 static inline size_t wapbl_space_free(size_t, off_t, off_t);
75 
76 #else /* !_KERNEL */
77 
78 #include <assert.h>
79 #include <errno.h>
80 #include <stdbool.h>
81 #include <stdio.h>
82 #include <stdlib.h>
83 #include <string.h>
84 
85 #define	KDASSERT(x) assert(x)
86 #define	KASSERT(x) assert(x)
87 #define	wapbl_alloc(s) malloc(s)
88 #define	wapbl_free(a, s) free(a)
89 #define	wapbl_calloc(n, s) calloc((n), (s))
90 
91 #endif /* !_KERNEL */
92 
93 /*
94  * INTERNAL DATA STRUCTURES
95  */
96 
97 /*
98  * This structure holds per-mount log information.
99  *
100  * Legend:	a = atomic access only
101  *		r = read-only after init
102  *		l = rwlock held
103  *		m = mutex held
104  *		lm = rwlock held writing or mutex held
105  *		u = unlocked access ok
106  *		b = bufcache_lock held
107  */
108 LIST_HEAD(wapbl_ino_head, wapbl_ino);
109 struct wapbl {
110 	struct vnode *wl_logvp;	/* r:	log here */
111 	struct vnode *wl_devvp;	/* r:	log on this device */
112 	struct mount *wl_mount;	/* r:	mountpoint wl is associated with */
113 	daddr_t wl_logpbn;	/* r:	Physical block number of start of log */
114 	int wl_log_dev_bshift;	/* r:	logarithm of device block size of log
115 					device */
116 	int wl_fs_dev_bshift;	/* r:	logarithm of device block size of
117 					filesystem device */
118 
119 	unsigned wl_lock_count;	/* m:	Count of transactions in progress */
120 
121 	size_t wl_circ_size; 	/* r:	Number of bytes in buffer of log */
122 	size_t wl_circ_off;	/* r:	Number of bytes reserved at start */
123 
124 	size_t wl_bufcount_max;	/* r:	Number of buffers reserved for log */
125 	size_t wl_bufbytes_max;	/* r:	Number of buf bytes reserved for log */
126 
127 	off_t wl_head;		/* l:	Byte offset of log head */
128 	off_t wl_tail;		/* l:	Byte offset of log tail */
129 	/*
130 	 * WAPBL log layout, stored on wl_devvp at wl_logpbn:
131 	 *
132 	 *  ___________________ wl_circ_size __________________
133 	 * /                                                   \
134 	 * +---------+---------+-------+--------------+--------+
135 	 * [ commit0 | commit1 | CCWCW | EEEEEEEEEEEE | CCCWCW ]
136 	 * +---------+---------+-------+--------------+--------+
137 	 *       wl_circ_off --^       ^-- wl_head    ^-- wl_tail
138 	 *
139 	 * commit0 and commit1 are commit headers.  A commit header has
140 	 * a generation number, indicating which of the two headers is
141 	 * more recent, and an assignment of head and tail pointers.
142 	 * The rest is a circular queue of log records, starting at
143 	 * the byte offset wl_circ_off.
144 	 *
145 	 * E marks empty space for records.
146 	 * W marks records for block writes issued but waiting.
147 	 * C marks completed records.
148 	 *
149 	 * wapbl_flush writes new records to empty `E' spaces after
150 	 * wl_head from the current transaction in memory.
151 	 *
152 	 * wapbl_truncate advances wl_tail past any completed `C'
153 	 * records, freeing them up for use.
154 	 *
155 	 * head == tail == 0 means log is empty.
156 	 * head == tail != 0 means log is full.
157 	 *
158 	 * See assertions in wapbl_advance() for other boundary
159 	 * conditions.
160 	 *
161 	 * Only wapbl_flush moves the head, except when wapbl_truncate
162 	 * sets it to 0 to indicate that the log is empty.
163 	 *
164 	 * Only wapbl_truncate moves the tail, except when wapbl_flush
165 	 * sets it to wl_circ_off to indicate that the log is full.
166 	 */
167 
168 	struct wapbl_wc_header *wl_wc_header;	/* l	*/
169 	void *wl_wc_scratch;	/* l:	scratch space (XXX: por que?!?) */
170 
171 	kmutex_t wl_mtx;	/* u:	short-term lock */
172 	krwlock_t wl_rwlock;	/* u:	File system transaction lock */
173 
174 	/*
175 	 * Must be held while accessing
176 	 * wl_count or wl_bufs or head or tail
177 	 */
178 
179 	/*
180 	 * Callback called from within the flush routine to flush any extra
181 	 * bits.  Note that flush may be skipped without calling this if
182 	 * there are no outstanding buffers in the transaction.
183 	 */
184 #if _KERNEL
185 	wapbl_flush_fn_t wl_flush;	/* r	*/
186 	wapbl_flush_fn_t wl_flush_abort;/* r	*/
187 #endif
188 
189 	size_t wl_bufbytes;	/* m:	Byte count of pages in wl_bufs */
190 	size_t wl_bufcount;	/* m:	Count of buffers in wl_bufs */
191 	size_t wl_bcount;	/* m:	Total bcount of wl_bufs */
192 
193 	LIST_HEAD(, buf) wl_bufs; /* m:	Buffers in current transaction */
194 
195 	kcondvar_t wl_reclaimable_cv;	/* m (obviously) */
196 	size_t wl_reclaimable_bytes; /* m:	Amount of space available for
197 						reclamation by truncate */
198 	int wl_error_count;	/* m:	# of wl_entries with errors */
199 	size_t wl_reserved_bytes; /* never truncate log smaller than this */
200 
201 #ifdef WAPBL_DEBUG_BUFBYTES
202 	size_t wl_unsynced_bufbytes; /* Byte count of unsynced buffers */
203 #endif
204 
205 	daddr_t *wl_deallocblks;/* lm:	address of block */
206 	int *wl_dealloclens;	/* lm:	size of block */
207 	int wl_dealloccnt;	/* lm:	total count */
208 	int wl_dealloclim;	/* l:	max count */
209 
210 	/* hashtable of inode numbers for allocated but unlinked inodes */
211 	/* synch ??? */
212 	struct wapbl_ino_head *wl_inohash;
213 	u_long wl_inohashmask;
214 	int wl_inohashcnt;
215 
216 	SIMPLEQ_HEAD(, wapbl_entry) wl_entries; /* On disk transaction
217 						   accounting */
218 
219 	u_char *wl_buffer;	/* l:   buffer for wapbl_buffered_write() */
220 	daddr_t wl_buffer_dblk;	/* l:   buffer disk block address */
221 	size_t wl_buffer_used;	/* l:   buffer current use */
222 };
223 
224 #ifdef WAPBL_DEBUG_PRINT
225 int wapbl_debug_print = WAPBL_DEBUG_PRINT;
226 #endif
227 
228 /****************************************************************/
229 #ifdef _KERNEL
230 
231 #ifdef WAPBL_DEBUG
232 struct wapbl *wapbl_debug_wl;
233 #endif
234 
235 static int wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail);
236 static int wapbl_write_blocks(struct wapbl *wl, off_t *offp);
237 static int wapbl_write_revocations(struct wapbl *wl, off_t *offp);
238 static int wapbl_write_inodes(struct wapbl *wl, off_t *offp);
239 #endif /* _KERNEL */
240 
241 static int wapbl_replay_process(struct wapbl_replay *wr, off_t, off_t);
242 
243 static inline size_t wapbl_space_used(size_t avail, off_t head,
244 	off_t tail);
245 
246 #ifdef _KERNEL
247 
248 static struct pool wapbl_entry_pool;
249 
250 #define	WAPBL_INODETRK_SIZE 83
251 static int wapbl_ino_pool_refcount;
252 static struct pool wapbl_ino_pool;
253 struct wapbl_ino {
254 	LIST_ENTRY(wapbl_ino) wi_hash;
255 	ino_t wi_ino;
256 	mode_t wi_mode;
257 };
258 
259 static void wapbl_inodetrk_init(struct wapbl *wl, u_int size);
260 static void wapbl_inodetrk_free(struct wapbl *wl);
261 static struct wapbl_ino *wapbl_inodetrk_get(struct wapbl *wl, ino_t ino);
262 
263 static size_t wapbl_transaction_len(struct wapbl *wl);
264 static inline size_t wapbl_transaction_inodes_len(struct wapbl *wl);
265 
266 #if 0
267 int wapbl_replay_verify(struct wapbl_replay *, struct vnode *);
268 #endif
269 
270 static int wapbl_replay_isopen1(struct wapbl_replay *);
271 
272 struct wapbl_ops wapbl_ops = {
273 	.wo_wapbl_discard	= wapbl_discard,
274 	.wo_wapbl_replay_isopen	= wapbl_replay_isopen1,
275 	.wo_wapbl_replay_can_read = wapbl_replay_can_read,
276 	.wo_wapbl_replay_read	= wapbl_replay_read,
277 	.wo_wapbl_add_buf	= wapbl_add_buf,
278 	.wo_wapbl_remove_buf	= wapbl_remove_buf,
279 	.wo_wapbl_resize_buf	= wapbl_resize_buf,
280 	.wo_wapbl_begin		= wapbl_begin,
281 	.wo_wapbl_end		= wapbl_end,
282 	.wo_wapbl_junlock_assert= wapbl_junlock_assert,
283 
284 	/* XXX: the following is only used to say "this is a wapbl buf" */
285 	.wo_wapbl_biodone	= wapbl_biodone,
286 };
287 
288 static int
289 wapbl_sysctl_init(void)
290 {
291 	int rv;
292 	const struct sysctlnode *rnode, *cnode;
293 
294 	wapbl_sysctl = NULL;
295 
296 	rv = sysctl_createv(&wapbl_sysctl, 0, NULL, &rnode,
297 		       CTLFLAG_PERMANENT,
298 		       CTLTYPE_NODE, "wapbl",
299 		       SYSCTL_DESCR("WAPBL journaling options"),
300 		       NULL, 0, NULL, 0,
301 		       CTL_VFS, CTL_CREATE, CTL_EOL);
302 	if (rv)
303 		return rv;
304 
305 	rv = sysctl_createv(&wapbl_sysctl, 0, &rnode, &cnode,
306 		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
307 		       CTLTYPE_INT, "flush_disk_cache",
308 		       SYSCTL_DESCR("flush disk cache"),
309 		       NULL, 0, &wapbl_flush_disk_cache, 0,
310 		       CTL_CREATE, CTL_EOL);
311 	if (rv)
312 		return rv;
313 
314 	rv = sysctl_createv(&wapbl_sysctl, 0, &rnode, &cnode,
315 		       CTLFLAG_PERMANENT|CTLFLAG_READWRITE,
316 		       CTLTYPE_INT, "verbose_commit",
317 		       SYSCTL_DESCR("show time and size of wapbl log commits"),
318 		       NULL, 0, &wapbl_verbose_commit, 0,
319 		       CTL_CREATE, CTL_EOL);
320 	return rv;
321 }
322 
323 static void
324 wapbl_init(void)
325 {
326 
327 	pool_init(&wapbl_entry_pool, sizeof(struct wapbl_entry), 0, 0, 0,
328 	    "wapblentrypl", &pool_allocator_kmem, IPL_VM);
329 
330 	wapbl_sysctl_init();
331 }
332 
333 static int
334 wapbl_fini(void)
335 {
336 
337 	if (wapbl_sysctl != NULL)
338 		 sysctl_teardown(&wapbl_sysctl);
339 
340 	pool_destroy(&wapbl_entry_pool);
341 
342 	return 0;
343 }
344 
345 static int
346 wapbl_start_flush_inodes(struct wapbl *wl, struct wapbl_replay *wr)
347 {
348 	int error, i;
349 
350 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
351 	    ("wapbl_start: reusing log with %d inodes\n", wr->wr_inodescnt));
352 
353 	/*
354 	 * Its only valid to reuse the replay log if its
355 	 * the same as the new log we just opened.
356 	 */
357 	KDASSERT(!wapbl_replay_isopen(wr));
358 	KASSERT(wl->wl_devvp->v_type == VBLK);
359 	KASSERT(wr->wr_devvp->v_type == VBLK);
360 	KASSERT(wl->wl_devvp->v_rdev == wr->wr_devvp->v_rdev);
361 	KASSERT(wl->wl_logpbn == wr->wr_logpbn);
362 	KASSERT(wl->wl_circ_size == wr->wr_circ_size);
363 	KASSERT(wl->wl_circ_off == wr->wr_circ_off);
364 	KASSERT(wl->wl_log_dev_bshift == wr->wr_log_dev_bshift);
365 	KASSERT(wl->wl_fs_dev_bshift == wr->wr_fs_dev_bshift);
366 
367 	wl->wl_wc_header->wc_generation = wr->wr_generation + 1;
368 
369 	for (i = 0; i < wr->wr_inodescnt; i++)
370 		wapbl_register_inode(wl, wr->wr_inodes[i].wr_inumber,
371 		    wr->wr_inodes[i].wr_imode);
372 
373 	/* Make sure new transaction won't overwrite old inodes list */
374 	KDASSERT(wapbl_transaction_len(wl) <=
375 	    wapbl_space_free(wl->wl_circ_size, wr->wr_inodeshead,
376 	    wr->wr_inodestail));
377 
378 	wl->wl_head = wl->wl_tail = wr->wr_inodeshead;
379 	wl->wl_reclaimable_bytes = wl->wl_reserved_bytes =
380 	    wapbl_transaction_len(wl);
381 
382 	error = wapbl_write_inodes(wl, &wl->wl_head);
383 	if (error)
384 		return error;
385 
386 	KASSERT(wl->wl_head != wl->wl_tail);
387 	KASSERT(wl->wl_head != 0);
388 
389 	return 0;
390 }
391 
392 int
393 wapbl_start(struct wapbl ** wlp, struct mount *mp, struct vnode *vp,
394 	daddr_t off, size_t count, size_t blksize, struct wapbl_replay *wr,
395 	wapbl_flush_fn_t flushfn, wapbl_flush_fn_t flushabortfn)
396 {
397 	struct wapbl *wl;
398 	struct vnode *devvp;
399 	daddr_t logpbn;
400 	int error;
401 	int log_dev_bshift = ilog2(blksize);
402 	int fs_dev_bshift = log_dev_bshift;
403 	int run;
404 
405 	WAPBL_PRINTF(WAPBL_PRINT_OPEN, ("wapbl_start: vp=%p off=%" PRId64
406 	    " count=%zu blksize=%zu\n", vp, off, count, blksize));
407 
408 	if (log_dev_bshift > fs_dev_bshift) {
409 		WAPBL_PRINTF(WAPBL_PRINT_OPEN,
410 			("wapbl: log device's block size cannot be larger "
411 			 "than filesystem's\n"));
412 		/*
413 		 * Not currently implemented, although it could be if
414 		 * needed someday.
415 		 */
416 		return ENOSYS;
417 	}
418 
419 	if (off < 0)
420 		return EINVAL;
421 
422 	if (blksize < DEV_BSIZE)
423 		return EINVAL;
424 	if (blksize % DEV_BSIZE)
425 		return EINVAL;
426 
427 	/* XXXTODO: verify that the full load is writable */
428 
429 	/*
430 	 * XXX check for minimum log size
431 	 * minimum is governed by minimum amount of space
432 	 * to complete a transaction. (probably truncate)
433 	 */
434 	/* XXX for now pick something minimal */
435 	if ((count * blksize) < MAXPHYS) {
436 		return ENOSPC;
437 	}
438 
439 	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, &run)) != 0) {
440 		return error;
441 	}
442 
443 	wl = wapbl_calloc(1, sizeof(*wl));
444 	rw_init(&wl->wl_rwlock);
445 	mutex_init(&wl->wl_mtx, MUTEX_DEFAULT, IPL_NONE);
446 	cv_init(&wl->wl_reclaimable_cv, "wapblrec");
447 	LIST_INIT(&wl->wl_bufs);
448 	SIMPLEQ_INIT(&wl->wl_entries);
449 
450 	wl->wl_logvp = vp;
451 	wl->wl_devvp = devvp;
452 	wl->wl_mount = mp;
453 	wl->wl_logpbn = logpbn;
454 	wl->wl_log_dev_bshift = log_dev_bshift;
455 	wl->wl_fs_dev_bshift = fs_dev_bshift;
456 
457 	wl->wl_flush = flushfn;
458 	wl->wl_flush_abort = flushabortfn;
459 
460 	/* Reserve two log device blocks for the commit headers */
461 	wl->wl_circ_off = 2<<wl->wl_log_dev_bshift;
462 	wl->wl_circ_size = ((count * blksize) - wl->wl_circ_off);
463 	/* truncate the log usage to a multiple of log_dev_bshift */
464 	wl->wl_circ_size >>= wl->wl_log_dev_bshift;
465 	wl->wl_circ_size <<= wl->wl_log_dev_bshift;
466 
467 	/*
468 	 * wl_bufbytes_max limits the size of the in memory transaction space.
469 	 * - Since buffers are allocated and accounted for in units of
470 	 *   PAGE_SIZE it is required to be a multiple of PAGE_SIZE
471 	 *   (i.e. 1<<PAGE_SHIFT)
472 	 * - Since the log device has to be written in units of
473 	 *   1<<wl_log_dev_bshift it is required to be a mulitple of
474 	 *   1<<wl_log_dev_bshift.
475 	 * - Since filesystem will provide data in units of 1<<wl_fs_dev_bshift,
476 	 *   it is convenient to be a multiple of 1<<wl_fs_dev_bshift.
477 	 * Therefore it must be multiple of the least common multiple of those
478 	 * three quantities.  Fortunately, all of those quantities are
479 	 * guaranteed to be a power of two, and the least common multiple of
480 	 * a set of numbers which are all powers of two is simply the maximum
481 	 * of those numbers.  Finally, the maximum logarithm of a power of two
482 	 * is the same as the log of the maximum power of two.  So we can do
483 	 * the following operations to size wl_bufbytes_max:
484 	 */
485 
486 	/* XXX fix actual number of pages reserved per filesystem. */
487 	wl->wl_bufbytes_max = MIN(wl->wl_circ_size, buf_memcalc() / 2);
488 
489 	/* Round wl_bufbytes_max to the largest power of two constraint */
490 	wl->wl_bufbytes_max >>= PAGE_SHIFT;
491 	wl->wl_bufbytes_max <<= PAGE_SHIFT;
492 	wl->wl_bufbytes_max >>= wl->wl_log_dev_bshift;
493 	wl->wl_bufbytes_max <<= wl->wl_log_dev_bshift;
494 	wl->wl_bufbytes_max >>= wl->wl_fs_dev_bshift;
495 	wl->wl_bufbytes_max <<= wl->wl_fs_dev_bshift;
496 
497 	/* XXX maybe use filesystem fragment size instead of 1024 */
498 	/* XXX fix actual number of buffers reserved per filesystem. */
499 	wl->wl_bufcount_max = (nbuf / 2) * 1024;
500 
501 	/* XXX tie this into resource estimation */
502 	wl->wl_dealloclim = wl->wl_bufbytes_max / mp->mnt_stat.f_bsize / 2;
503 
504 	wl->wl_deallocblks = wapbl_alloc(sizeof(*wl->wl_deallocblks) *
505 	    wl->wl_dealloclim);
506 	wl->wl_dealloclens = wapbl_alloc(sizeof(*wl->wl_dealloclens) *
507 	    wl->wl_dealloclim);
508 
509 	wl->wl_buffer = wapbl_alloc(MAXPHYS);
510 	wl->wl_buffer_used = 0;
511 
512 	wapbl_inodetrk_init(wl, WAPBL_INODETRK_SIZE);
513 
514 	/* Initialize the commit header */
515 	{
516 		struct wapbl_wc_header *wc;
517 		size_t len = 1 << wl->wl_log_dev_bshift;
518 		wc = wapbl_calloc(1, len);
519 		wc->wc_type = WAPBL_WC_HEADER;
520 		wc->wc_len = len;
521 		wc->wc_circ_off = wl->wl_circ_off;
522 		wc->wc_circ_size = wl->wl_circ_size;
523 		/* XXX wc->wc_fsid */
524 		wc->wc_log_dev_bshift = wl->wl_log_dev_bshift;
525 		wc->wc_fs_dev_bshift = wl->wl_fs_dev_bshift;
526 		wl->wl_wc_header = wc;
527 		wl->wl_wc_scratch = wapbl_alloc(len);
528 	}
529 
530 	/*
531 	 * if there was an existing set of unlinked but
532 	 * allocated inodes, preserve it in the new
533 	 * log.
534 	 */
535 	if (wr && wr->wr_inodescnt) {
536 		error = wapbl_start_flush_inodes(wl, wr);
537 		if (error)
538 			goto errout;
539 	}
540 
541 	error = wapbl_write_commit(wl, wl->wl_head, wl->wl_tail);
542 	if (error) {
543 		goto errout;
544 	}
545 
546 	*wlp = wl;
547 #if defined(WAPBL_DEBUG)
548 	wapbl_debug_wl = wl;
549 #endif
550 
551 	return 0;
552  errout:
553 	wapbl_discard(wl);
554 	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
555 	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
556 	wapbl_free(wl->wl_deallocblks,
557 	    sizeof(*wl->wl_deallocblks) * wl->wl_dealloclim);
558 	wapbl_free(wl->wl_dealloclens,
559 	    sizeof(*wl->wl_dealloclens) * wl->wl_dealloclim);
560 	wapbl_free(wl->wl_buffer, MAXPHYS);
561 	wapbl_inodetrk_free(wl);
562 	wapbl_free(wl, sizeof(*wl));
563 
564 	return error;
565 }
566 
567 /*
568  * Like wapbl_flush, only discards the transaction
569  * completely
570  */
571 
572 void
573 wapbl_discard(struct wapbl *wl)
574 {
575 	struct wapbl_entry *we;
576 	struct buf *bp;
577 	int i;
578 
579 	/*
580 	 * XXX we may consider using upgrade here
581 	 * if we want to call flush from inside a transaction
582 	 */
583 	rw_enter(&wl->wl_rwlock, RW_WRITER);
584 	wl->wl_flush(wl->wl_mount, wl->wl_deallocblks, wl->wl_dealloclens,
585 	    wl->wl_dealloccnt);
586 
587 #ifdef WAPBL_DEBUG_PRINT
588 	{
589 		pid_t pid = -1;
590 		lwpid_t lid = -1;
591 		if (curproc)
592 			pid = curproc->p_pid;
593 		if (curlwp)
594 			lid = curlwp->l_lid;
595 #ifdef WAPBL_DEBUG_BUFBYTES
596 		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
597 		    ("wapbl_discard: thread %d.%d discarding "
598 		    "transaction\n"
599 		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
600 		    "deallocs=%d inodes=%d\n"
601 		    "\terrcnt = %u, reclaimable=%zu reserved=%zu "
602 		    "unsynced=%zu\n",
603 		    pid, lid, wl->wl_bufcount, wl->wl_bufbytes,
604 		    wl->wl_bcount, wl->wl_dealloccnt,
605 		    wl->wl_inohashcnt, wl->wl_error_count,
606 		    wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
607 		    wl->wl_unsynced_bufbytes));
608 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
609 			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
610 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
611 			     "error = %d, unsynced = %zu\n",
612 			     we->we_bufcount, we->we_reclaimable_bytes,
613 			     we->we_error, we->we_unsynced_bufbytes));
614 		}
615 #else /* !WAPBL_DEBUG_BUFBYTES */
616 		WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
617 		    ("wapbl_discard: thread %d.%d discarding transaction\n"
618 		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
619 		    "deallocs=%d inodes=%d\n"
620 		    "\terrcnt = %u, reclaimable=%zu reserved=%zu\n",
621 		    pid, lid, wl->wl_bufcount, wl->wl_bufbytes,
622 		    wl->wl_bcount, wl->wl_dealloccnt,
623 		    wl->wl_inohashcnt, wl->wl_error_count,
624 		    wl->wl_reclaimable_bytes, wl->wl_reserved_bytes));
625 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
626 			WAPBL_PRINTF(WAPBL_PRINT_DISCARD,
627 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
628 			     "error = %d\n",
629 			     we->we_bufcount, we->we_reclaimable_bytes,
630 			     we->we_error));
631 		}
632 #endif /* !WAPBL_DEBUG_BUFBYTES */
633 	}
634 #endif /* WAPBL_DEBUG_PRINT */
635 
636 	for (i = 0; i <= wl->wl_inohashmask; i++) {
637 		struct wapbl_ino_head *wih;
638 		struct wapbl_ino *wi;
639 
640 		wih = &wl->wl_inohash[i];
641 		while ((wi = LIST_FIRST(wih)) != NULL) {
642 			LIST_REMOVE(wi, wi_hash);
643 			pool_put(&wapbl_ino_pool, wi);
644 			KASSERT(wl->wl_inohashcnt > 0);
645 			wl->wl_inohashcnt--;
646 		}
647 	}
648 
649 	/*
650 	 * clean buffer list
651 	 */
652 	mutex_enter(&bufcache_lock);
653 	mutex_enter(&wl->wl_mtx);
654 	while ((bp = LIST_FIRST(&wl->wl_bufs)) != NULL) {
655 		if (bbusy(bp, 0, 0, &wl->wl_mtx) == 0) {
656 			/*
657 			 * The buffer will be unlocked and
658 			 * removed from the transaction in brelse
659 			 */
660 			mutex_exit(&wl->wl_mtx);
661 			brelsel(bp, 0);
662 			mutex_enter(&wl->wl_mtx);
663 		}
664 	}
665 	mutex_exit(&wl->wl_mtx);
666 	mutex_exit(&bufcache_lock);
667 
668 	/*
669 	 * Remove references to this wl from wl_entries, free any which
670 	 * no longer have buffers, others will be freed in wapbl_biodone
671 	 * when they no longer have any buffers.
672 	 */
673 	while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) != NULL) {
674 		SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
675 		/* XXX should we be accumulating wl_error_count
676 		 * and increasing reclaimable bytes ? */
677 		we->we_wapbl = NULL;
678 		if (we->we_bufcount == 0) {
679 #ifdef WAPBL_DEBUG_BUFBYTES
680 			KASSERT(we->we_unsynced_bufbytes == 0);
681 #endif
682 			pool_put(&wapbl_entry_pool, we);
683 		}
684 	}
685 
686 	/* Discard list of deallocs */
687 	wl->wl_dealloccnt = 0;
688 	/* XXX should we clear wl_reserved_bytes? */
689 
690 	KASSERT(wl->wl_bufbytes == 0);
691 	KASSERT(wl->wl_bcount == 0);
692 	KASSERT(wl->wl_bufcount == 0);
693 	KASSERT(LIST_EMPTY(&wl->wl_bufs));
694 	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
695 	KASSERT(wl->wl_inohashcnt == 0);
696 
697 	rw_exit(&wl->wl_rwlock);
698 }
699 
700 int
701 wapbl_stop(struct wapbl *wl, int force)
702 {
703 	int error;
704 
705 	WAPBL_PRINTF(WAPBL_PRINT_OPEN, ("wapbl_stop called\n"));
706 	error = wapbl_flush(wl, 1);
707 	if (error) {
708 		if (force)
709 			wapbl_discard(wl);
710 		else
711 			return error;
712 	}
713 
714 	/* Unlinked inodes persist after a flush */
715 	if (wl->wl_inohashcnt) {
716 		if (force) {
717 			wapbl_discard(wl);
718 		} else {
719 			return EBUSY;
720 		}
721 	}
722 
723 	KASSERT(wl->wl_bufbytes == 0);
724 	KASSERT(wl->wl_bcount == 0);
725 	KASSERT(wl->wl_bufcount == 0);
726 	KASSERT(LIST_EMPTY(&wl->wl_bufs));
727 	KASSERT(wl->wl_dealloccnt == 0);
728 	KASSERT(SIMPLEQ_EMPTY(&wl->wl_entries));
729 	KASSERT(wl->wl_inohashcnt == 0);
730 
731 	wapbl_free(wl->wl_wc_scratch, wl->wl_wc_header->wc_len);
732 	wapbl_free(wl->wl_wc_header, wl->wl_wc_header->wc_len);
733 	wapbl_free(wl->wl_deallocblks,
734 	    sizeof(*wl->wl_deallocblks) * wl->wl_dealloclim);
735 	wapbl_free(wl->wl_dealloclens,
736 	    sizeof(*wl->wl_dealloclens) * wl->wl_dealloclim);
737 	wapbl_free(wl->wl_buffer, MAXPHYS);
738 	wapbl_inodetrk_free(wl);
739 
740 	cv_destroy(&wl->wl_reclaimable_cv);
741 	mutex_destroy(&wl->wl_mtx);
742 	rw_destroy(&wl->wl_rwlock);
743 	wapbl_free(wl, sizeof(*wl));
744 
745 	return 0;
746 }
747 
748 /****************************************************************/
749 /*
750  * Unbuffered disk I/O
751  */
752 
753 static int
754 wapbl_doio(void *data, size_t len, struct vnode *devvp, daddr_t pbn, int flags)
755 {
756 	struct pstats *pstats = curlwp->l_proc->p_stats;
757 	struct buf *bp;
758 	int error;
759 
760 	KASSERT((flags & ~(B_WRITE | B_READ)) == 0);
761 	KASSERT(devvp->v_type == VBLK);
762 
763 	if ((flags & (B_WRITE | B_READ)) == B_WRITE) {
764 		mutex_enter(devvp->v_interlock);
765 		devvp->v_numoutput++;
766 		mutex_exit(devvp->v_interlock);
767 		pstats->p_ru.ru_oublock++;
768 	} else {
769 		pstats->p_ru.ru_inblock++;
770 	}
771 
772 	bp = getiobuf(devvp, true);
773 	bp->b_flags = flags;
774 	bp->b_cflags = BC_BUSY; /* silly & dubious */
775 	bp->b_dev = devvp->v_rdev;
776 	bp->b_data = data;
777 	bp->b_bufsize = bp->b_resid = bp->b_bcount = len;
778 	bp->b_blkno = pbn;
779 	BIO_SETPRIO(bp, BPRIO_TIMECRITICAL);
780 
781 	WAPBL_PRINTF(WAPBL_PRINT_IO,
782 	    ("wapbl_doio: %s %d bytes at block %"PRId64" on dev 0x%"PRIx64"\n",
783 	    BUF_ISWRITE(bp) ? "write" : "read", bp->b_bcount,
784 	    bp->b_blkno, bp->b_dev));
785 
786 	VOP_STRATEGY(devvp, bp);
787 
788 	error = biowait(bp);
789 	putiobuf(bp);
790 
791 	if (error) {
792 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
793 		    ("wapbl_doio: %s %zu bytes at block %" PRId64
794 		    " on dev 0x%"PRIx64" failed with error %d\n",
795 		    (((flags & (B_WRITE | B_READ)) == B_WRITE) ?
796 		     "write" : "read"),
797 		    len, pbn, devvp->v_rdev, error));
798 	}
799 
800 	return error;
801 }
802 
803 /*
804  * wapbl_write(data, len, devvp, pbn)
805  *
806  *	Synchronously write len bytes from data to physical block pbn
807  *	on devvp.
808  */
809 int
810 wapbl_write(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
811 {
812 
813 	return wapbl_doio(data, len, devvp, pbn, B_WRITE);
814 }
815 
816 /*
817  * wapbl_read(data, len, devvp, pbn)
818  *
819  *	Synchronously read len bytes into data from physical block pbn
820  *	on devvp.
821  */
822 int
823 wapbl_read(void *data, size_t len, struct vnode *devvp, daddr_t pbn)
824 {
825 
826 	return wapbl_doio(data, len, devvp, pbn, B_READ);
827 }
828 
829 /****************************************************************/
830 /*
831  * Buffered disk writes -- try to coalesce writes and emit
832  * MAXPHYS-aligned blocks.
833  */
834 
835 /*
836  * wapbl_buffered_flush(wl)
837  *
838  *	Flush any buffered writes from wapbl_buffered_write.
839  */
840 static int
841 wapbl_buffered_flush(struct wapbl *wl)
842 {
843 	int error;
844 
845 	if (wl->wl_buffer_used == 0)
846 		return 0;
847 
848 	error = wapbl_doio(wl->wl_buffer, wl->wl_buffer_used,
849 	    wl->wl_devvp, wl->wl_buffer_dblk, B_WRITE);
850 	wl->wl_buffer_used = 0;
851 
852 	return error;
853 }
854 
855 /*
856  * wapbl_buffered_write(data, len, wl, pbn)
857  *
858  *	Write len bytes from data to physical block pbn on
859  *	wl->wl_devvp.  The write may not complete until
860  *	wapbl_buffered_flush.
861  */
862 static int
863 wapbl_buffered_write(void *data, size_t len, struct wapbl *wl, daddr_t pbn)
864 {
865 	int error;
866 	size_t resid;
867 
868 	/*
869 	 * If not adjacent to buffered data flush first.  Disk block
870 	 * address is always valid for non-empty buffer.
871 	 */
872 	if (wl->wl_buffer_used > 0 &&
873 	    pbn != wl->wl_buffer_dblk + btodb(wl->wl_buffer_used)) {
874 		error = wapbl_buffered_flush(wl);
875 		if (error)
876 			return error;
877 	}
878 	/*
879 	 * If this write goes to an empty buffer we have to
880 	 * save the disk block address first.
881 	 */
882 	if (wl->wl_buffer_used == 0)
883 		wl->wl_buffer_dblk = pbn;
884 	/*
885 	 * Remaining space so this buffer ends on a MAXPHYS boundary.
886 	 *
887 	 * Cannot become less or equal zero as the buffer would have been
888 	 * flushed on the last call then.
889 	 */
890 	resid = MAXPHYS - dbtob(wl->wl_buffer_dblk % btodb(MAXPHYS)) -
891 	    wl->wl_buffer_used;
892 	KASSERT(resid > 0);
893 	KASSERT(dbtob(btodb(resid)) == resid);
894 	if (len >= resid) {
895 		memcpy(wl->wl_buffer + wl->wl_buffer_used, data, resid);
896 		wl->wl_buffer_used += resid;
897 		error = wapbl_doio(wl->wl_buffer, wl->wl_buffer_used,
898 		    wl->wl_devvp, wl->wl_buffer_dblk, B_WRITE);
899 		data = (uint8_t *)data + resid;
900 		len -= resid;
901 		wl->wl_buffer_dblk = pbn + btodb(resid);
902 		wl->wl_buffer_used = 0;
903 		if (error)
904 			return error;
905 	}
906 	KASSERT(len < MAXPHYS);
907 	if (len > 0) {
908 		memcpy(wl->wl_buffer + wl->wl_buffer_used, data, len);
909 		wl->wl_buffer_used += len;
910 	}
911 
912 	return 0;
913 }
914 
915 /*
916  * wapbl_circ_write(wl, data, len, offp)
917  *
918  *	Write len bytes from data to the circular queue of wl, starting
919  *	at linear byte offset *offp, and returning the new linear byte
920  *	offset in *offp.
921  *
922  *	If the starting linear byte offset precedes wl->wl_circ_off,
923  *	the write instead begins at wl->wl_circ_off.  XXX WTF?  This
924  *	should be a KASSERT, not a conditional.
925  *
926  *	The write is buffered in wl and must be flushed with
927  *	wapbl_buffered_flush before it will be submitted to the disk.
928  */
929 static int
930 wapbl_circ_write(struct wapbl *wl, void *data, size_t len, off_t *offp)
931 {
932 	size_t slen;
933 	off_t off = *offp;
934 	int error;
935 	daddr_t pbn;
936 
937 	KDASSERT(((len >> wl->wl_log_dev_bshift) <<
938 	    wl->wl_log_dev_bshift) == len);
939 
940 	if (off < wl->wl_circ_off)
941 		off = wl->wl_circ_off;
942 	slen = wl->wl_circ_off + wl->wl_circ_size - off;
943 	if (slen < len) {
944 		pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
945 #ifdef _KERNEL
946 		pbn = btodb(pbn << wl->wl_log_dev_bshift);
947 #endif
948 		error = wapbl_buffered_write(data, slen, wl, pbn);
949 		if (error)
950 			return error;
951 		data = (uint8_t *)data + slen;
952 		len -= slen;
953 		off = wl->wl_circ_off;
954 	}
955 	pbn = wl->wl_logpbn + (off >> wl->wl_log_dev_bshift);
956 #ifdef _KERNEL
957 	pbn = btodb(pbn << wl->wl_log_dev_bshift);
958 #endif
959 	error = wapbl_buffered_write(data, len, wl, pbn);
960 	if (error)
961 		return error;
962 	off += len;
963 	if (off >= wl->wl_circ_off + wl->wl_circ_size)
964 		off = wl->wl_circ_off;
965 	*offp = off;
966 	return 0;
967 }
968 
969 /****************************************************************/
970 /*
971  * WAPBL transactions: entering, adding/removing bufs, and exiting
972  */
973 
974 int
975 wapbl_begin(struct wapbl *wl, const char *file, int line)
976 {
977 	int doflush;
978 	unsigned lockcount;
979 
980 	KDASSERT(wl);
981 
982 	/*
983 	 * XXX this needs to be made much more sophisticated.
984 	 * perhaps each wapbl_begin could reserve a specified
985 	 * number of buffers and bytes.
986 	 */
987 	mutex_enter(&wl->wl_mtx);
988 	lockcount = wl->wl_lock_count;
989 	doflush = ((wl->wl_bufbytes + (lockcount * MAXPHYS)) >
990 		   wl->wl_bufbytes_max / 2) ||
991 		  ((wl->wl_bufcount + (lockcount * 10)) >
992 		   wl->wl_bufcount_max / 2) ||
993 		  (wapbl_transaction_len(wl) > wl->wl_circ_size / 2) ||
994 		  (wl->wl_dealloccnt >= (wl->wl_dealloclim / 2));
995 	mutex_exit(&wl->wl_mtx);
996 
997 	if (doflush) {
998 		WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
999 		    ("force flush lockcnt=%d bufbytes=%zu "
1000 		    "(max=%zu) bufcount=%zu (max=%zu) "
1001 		    "dealloccnt %d (lim=%d)\n",
1002 		    lockcount, wl->wl_bufbytes,
1003 		    wl->wl_bufbytes_max, wl->wl_bufcount,
1004 		    wl->wl_bufcount_max,
1005 		    wl->wl_dealloccnt, wl->wl_dealloclim));
1006 	}
1007 
1008 	if (doflush) {
1009 		int error = wapbl_flush(wl, 0);
1010 		if (error)
1011 			return error;
1012 	}
1013 
1014 	rw_enter(&wl->wl_rwlock, RW_READER);
1015 	mutex_enter(&wl->wl_mtx);
1016 	wl->wl_lock_count++;
1017 	mutex_exit(&wl->wl_mtx);
1018 
1019 #if defined(WAPBL_DEBUG_PRINT)
1020 	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
1021 	    ("wapbl_begin thread %d.%d with bufcount=%zu "
1022 	    "bufbytes=%zu bcount=%zu at %s:%d\n",
1023 	    curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1024 	    wl->wl_bufbytes, wl->wl_bcount, file, line));
1025 #endif
1026 
1027 	return 0;
1028 }
1029 
1030 void
1031 wapbl_end(struct wapbl *wl)
1032 {
1033 
1034 #if defined(WAPBL_DEBUG_PRINT)
1035 	WAPBL_PRINTF(WAPBL_PRINT_TRANSACTION,
1036 	     ("wapbl_end thread %d.%d with bufcount=%zu "
1037 	      "bufbytes=%zu bcount=%zu\n",
1038 	      curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1039 	      wl->wl_bufbytes, wl->wl_bcount));
1040 #endif
1041 
1042 	/*
1043 	 * XXX this could be handled more gracefully, perhaps place
1044 	 * only a partial transaction in the log and allow the
1045 	 * remaining to flush without the protection of the journal.
1046 	 */
1047 	KASSERTMSG((wapbl_transaction_len(wl) <=
1048 		(wl->wl_circ_size - wl->wl_reserved_bytes)),
1049 	    "wapbl_end: current transaction too big to flush");
1050 
1051 	mutex_enter(&wl->wl_mtx);
1052 	KASSERT(wl->wl_lock_count > 0);
1053 	wl->wl_lock_count--;
1054 	mutex_exit(&wl->wl_mtx);
1055 
1056 	rw_exit(&wl->wl_rwlock);
1057 }
1058 
1059 void
1060 wapbl_add_buf(struct wapbl *wl, struct buf * bp)
1061 {
1062 
1063 	KASSERT(bp->b_cflags & BC_BUSY);
1064 	KASSERT(bp->b_vp);
1065 
1066 	wapbl_jlock_assert(wl);
1067 
1068 #if 0
1069 	/*
1070 	 * XXX this might be an issue for swapfiles.
1071 	 * see uvm_swap.c:1702
1072 	 *
1073 	 * XXX2 why require it then?  leap of semantics?
1074 	 */
1075 	KASSERT((bp->b_cflags & BC_NOCACHE) == 0);
1076 #endif
1077 
1078 	mutex_enter(&wl->wl_mtx);
1079 	if (bp->b_flags & B_LOCKED) {
1080 		LIST_REMOVE(bp, b_wapbllist);
1081 		WAPBL_PRINTF(WAPBL_PRINT_BUFFER2,
1082 		   ("wapbl_add_buf thread %d.%d re-adding buf %p "
1083 		    "with %d bytes %d bcount\n",
1084 		    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize,
1085 		    bp->b_bcount));
1086 	} else {
1087 		/* unlocked by dirty buffers shouldn't exist */
1088 		KASSERT(!(bp->b_oflags & BO_DELWRI));
1089 		wl->wl_bufbytes += bp->b_bufsize;
1090 		wl->wl_bcount += bp->b_bcount;
1091 		wl->wl_bufcount++;
1092 		WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
1093 		   ("wapbl_add_buf thread %d.%d adding buf %p "
1094 		    "with %d bytes %d bcount\n",
1095 		    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize,
1096 		    bp->b_bcount));
1097 	}
1098 	LIST_INSERT_HEAD(&wl->wl_bufs, bp, b_wapbllist);
1099 	mutex_exit(&wl->wl_mtx);
1100 
1101 	bp->b_flags |= B_LOCKED;
1102 }
1103 
1104 static void
1105 wapbl_remove_buf_locked(struct wapbl * wl, struct buf *bp)
1106 {
1107 
1108 	KASSERT(mutex_owned(&wl->wl_mtx));
1109 	KASSERT(bp->b_cflags & BC_BUSY);
1110 	wapbl_jlock_assert(wl);
1111 
1112 #if 0
1113 	/*
1114 	 * XXX this might be an issue for swapfiles.
1115 	 * see uvm_swap.c:1725
1116 	 *
1117 	 * XXXdeux: see above
1118 	 */
1119 	KASSERT((bp->b_flags & BC_NOCACHE) == 0);
1120 #endif
1121 	KASSERT(bp->b_flags & B_LOCKED);
1122 
1123 	WAPBL_PRINTF(WAPBL_PRINT_BUFFER,
1124 	   ("wapbl_remove_buf thread %d.%d removing buf %p with "
1125 	    "%d bytes %d bcount\n",
1126 	    curproc->p_pid, curlwp->l_lid, bp, bp->b_bufsize, bp->b_bcount));
1127 
1128 	KASSERT(wl->wl_bufbytes >= bp->b_bufsize);
1129 	wl->wl_bufbytes -= bp->b_bufsize;
1130 	KASSERT(wl->wl_bcount >= bp->b_bcount);
1131 	wl->wl_bcount -= bp->b_bcount;
1132 	KASSERT(wl->wl_bufcount > 0);
1133 	wl->wl_bufcount--;
1134 	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
1135 	KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
1136 	LIST_REMOVE(bp, b_wapbllist);
1137 
1138 	bp->b_flags &= ~B_LOCKED;
1139 }
1140 
1141 /* called from brelsel() in vfs_bio among other places */
1142 void
1143 wapbl_remove_buf(struct wapbl * wl, struct buf *bp)
1144 {
1145 
1146 	mutex_enter(&wl->wl_mtx);
1147 	wapbl_remove_buf_locked(wl, bp);
1148 	mutex_exit(&wl->wl_mtx);
1149 }
1150 
1151 void
1152 wapbl_resize_buf(struct wapbl *wl, struct buf *bp, long oldsz, long oldcnt)
1153 {
1154 
1155 	KASSERT(bp->b_cflags & BC_BUSY);
1156 
1157 	/*
1158 	 * XXX: why does this depend on B_LOCKED?  otherwise the buf
1159 	 * is not for a transaction?  if so, why is this called in the
1160 	 * first place?
1161 	 */
1162 	if (bp->b_flags & B_LOCKED) {
1163 		mutex_enter(&wl->wl_mtx);
1164 		wl->wl_bufbytes += bp->b_bufsize - oldsz;
1165 		wl->wl_bcount += bp->b_bcount - oldcnt;
1166 		mutex_exit(&wl->wl_mtx);
1167 	}
1168 }
1169 
1170 #endif /* _KERNEL */
1171 
1172 /****************************************************************/
1173 /* Some utility inlines */
1174 
1175 /*
1176  * wapbl_space_used(avail, head, tail)
1177  *
1178  *	Number of bytes used in a circular queue of avail total bytes,
1179  *	from tail to head.
1180  */
1181 static inline size_t
1182 wapbl_space_used(size_t avail, off_t head, off_t tail)
1183 {
1184 
1185 	if (tail == 0) {
1186 		KASSERT(head == 0);
1187 		return 0;
1188 	}
1189 	return ((head + (avail - 1) - tail) % avail) + 1;
1190 }
1191 
1192 #ifdef _KERNEL
1193 /*
1194  * wapbl_advance(size, off, oldoff, delta)
1195  *
1196  *	Given a byte offset oldoff into a circular queue of size bytes
1197  *	starting at off, return a new byte offset oldoff + delta into
1198  *	the circular queue.
1199  */
1200 static inline off_t
1201 wapbl_advance(size_t size, size_t off, off_t oldoff, size_t delta)
1202 {
1203 	off_t newoff;
1204 
1205 	/* Define acceptable ranges for inputs. */
1206 	KASSERT(delta <= (size_t)size);
1207 	KASSERT((oldoff == 0) || ((size_t)oldoff >= off));
1208 	KASSERT(oldoff < (off_t)(size + off));
1209 
1210 	if ((oldoff == 0) && (delta != 0))
1211 		newoff = off + delta;
1212 	else if ((oldoff + delta) < (size + off))
1213 		newoff = oldoff + delta;
1214 	else
1215 		newoff = (oldoff + delta) - size;
1216 
1217 	/* Note some interesting axioms */
1218 	KASSERT((delta != 0) || (newoff == oldoff));
1219 	KASSERT((delta == 0) || (newoff != 0));
1220 	KASSERT((delta != (size)) || (newoff == oldoff));
1221 
1222 	/* Define acceptable ranges for output. */
1223 	KASSERT((newoff == 0) || ((size_t)newoff >= off));
1224 	KASSERT((size_t)newoff < (size + off));
1225 	return newoff;
1226 }
1227 
1228 /*
1229  * wapbl_space_free(avail, head, tail)
1230  *
1231  *	Number of bytes free in a circular queue of avail total bytes,
1232  *	in which everything from tail to head is used.
1233  */
1234 static inline size_t
1235 wapbl_space_free(size_t avail, off_t head, off_t tail)
1236 {
1237 
1238 	return avail - wapbl_space_used(avail, head, tail);
1239 }
1240 
1241 /*
1242  * wapbl_advance_head(size, off, delta, headp, tailp)
1243  *
1244  *	In a circular queue of size bytes starting at off, given the
1245  *	old head and tail offsets *headp and *tailp, store the new head
1246  *	and tail offsets in *headp and *tailp resulting from adding
1247  *	delta bytes of data to the head.
1248  */
1249 static inline void
1250 wapbl_advance_head(size_t size, size_t off, size_t delta, off_t *headp,
1251 		   off_t *tailp)
1252 {
1253 	off_t head = *headp;
1254 	off_t tail = *tailp;
1255 
1256 	KASSERT(delta <= wapbl_space_free(size, head, tail));
1257 	head = wapbl_advance(size, off, head, delta);
1258 	if ((tail == 0) && (head != 0))
1259 		tail = off;
1260 	*headp = head;
1261 	*tailp = tail;
1262 }
1263 
1264 /*
1265  * wapbl_advance_tail(size, off, delta, headp, tailp)
1266  *
1267  *	In a circular queue of size bytes starting at off, given the
1268  *	old head and tail offsets *headp and *tailp, store the new head
1269  *	and tail offsets in *headp and *tailp resulting from removing
1270  *	delta bytes of data from the tail.
1271  */
1272 static inline void
1273 wapbl_advance_tail(size_t size, size_t off, size_t delta, off_t *headp,
1274 		   off_t *tailp)
1275 {
1276 	off_t head = *headp;
1277 	off_t tail = *tailp;
1278 
1279 	KASSERT(delta <= wapbl_space_used(size, head, tail));
1280 	tail = wapbl_advance(size, off, tail, delta);
1281 	if (head == tail) {
1282 		head = tail = 0;
1283 	}
1284 	*headp = head;
1285 	*tailp = tail;
1286 }
1287 
1288 
1289 /****************************************************************/
1290 
1291 /*
1292  * wapbl_truncate(wl, minfree)
1293  *
1294  *	Wait until at least minfree bytes are available in the log.
1295  *
1296  *	If it was necessary to wait for writes to complete,
1297  *	advance the circular queue tail to reflect the new write
1298  *	completions and issue a write commit to the log.
1299  *
1300  *	=> Caller must hold wl->wl_rwlock writer lock.
1301  */
1302 static int
1303 wapbl_truncate(struct wapbl *wl, size_t minfree)
1304 {
1305 	size_t delta;
1306 	size_t avail;
1307 	off_t head;
1308 	off_t tail;
1309 	int error = 0;
1310 
1311 	KASSERT(minfree <= (wl->wl_circ_size - wl->wl_reserved_bytes));
1312 	KASSERT(rw_write_held(&wl->wl_rwlock));
1313 
1314 	mutex_enter(&wl->wl_mtx);
1315 
1316 	/*
1317 	 * First check to see if we have to do a commit
1318 	 * at all.
1319 	 */
1320 	avail = wapbl_space_free(wl->wl_circ_size, wl->wl_head, wl->wl_tail);
1321 	if (minfree < avail) {
1322 		mutex_exit(&wl->wl_mtx);
1323 		return 0;
1324 	}
1325 	minfree -= avail;
1326 	while ((wl->wl_error_count == 0) &&
1327 	    (wl->wl_reclaimable_bytes < minfree)) {
1328         	WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
1329                    ("wapbl_truncate: sleeping on %p wl=%p bytes=%zd "
1330 		    "minfree=%zd\n",
1331                     &wl->wl_reclaimable_bytes, wl, wl->wl_reclaimable_bytes,
1332 		    minfree));
1333 
1334 		cv_wait(&wl->wl_reclaimable_cv, &wl->wl_mtx);
1335 	}
1336 	if (wl->wl_reclaimable_bytes < minfree) {
1337 		KASSERT(wl->wl_error_count);
1338 		/* XXX maybe get actual error from buffer instead someday? */
1339 		error = EIO;
1340 	}
1341 	head = wl->wl_head;
1342 	tail = wl->wl_tail;
1343 	delta = wl->wl_reclaimable_bytes;
1344 
1345 	/* If all of of the entries are flushed, then be sure to keep
1346 	 * the reserved bytes reserved.  Watch out for discarded transactions,
1347 	 * which could leave more bytes reserved than are reclaimable.
1348 	 */
1349 	if (SIMPLEQ_EMPTY(&wl->wl_entries) &&
1350 	    (delta >= wl->wl_reserved_bytes)) {
1351 		delta -= wl->wl_reserved_bytes;
1352 	}
1353 	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta, &head,
1354 			   &tail);
1355 	KDASSERT(wl->wl_reserved_bytes <=
1356 		wapbl_space_used(wl->wl_circ_size, head, tail));
1357 	mutex_exit(&wl->wl_mtx);
1358 
1359 	if (error)
1360 		return error;
1361 
1362 	/*
1363 	 * This is where head, tail and delta are unprotected
1364 	 * from races against itself or flush.  This is ok since
1365 	 * we only call this routine from inside flush itself.
1366 	 *
1367 	 * XXX: how can it race against itself when accessed only
1368 	 * from behind the write-locked rwlock?
1369 	 */
1370 	error = wapbl_write_commit(wl, head, tail);
1371 	if (error)
1372 		return error;
1373 
1374 	wl->wl_head = head;
1375 	wl->wl_tail = tail;
1376 
1377 	mutex_enter(&wl->wl_mtx);
1378 	KASSERT(wl->wl_reclaimable_bytes >= delta);
1379 	wl->wl_reclaimable_bytes -= delta;
1380 	mutex_exit(&wl->wl_mtx);
1381 	WAPBL_PRINTF(WAPBL_PRINT_TRUNCATE,
1382 	    ("wapbl_truncate thread %d.%d truncating %zu bytes\n",
1383 	    curproc->p_pid, curlwp->l_lid, delta));
1384 
1385 	return 0;
1386 }
1387 
1388 /****************************************************************/
1389 
1390 void
1391 wapbl_biodone(struct buf *bp)
1392 {
1393 	struct wapbl_entry *we = bp->b_private;
1394 	struct wapbl *wl = we->we_wapbl;
1395 #ifdef WAPBL_DEBUG_BUFBYTES
1396 	const int bufsize = bp->b_bufsize;
1397 #endif
1398 
1399 	/*
1400 	 * Handle possible flushing of buffers after log has been
1401 	 * decomissioned.
1402 	 */
1403 	if (!wl) {
1404 		KASSERT(we->we_bufcount > 0);
1405 		we->we_bufcount--;
1406 #ifdef WAPBL_DEBUG_BUFBYTES
1407 		KASSERT(we->we_unsynced_bufbytes >= bufsize);
1408 		we->we_unsynced_bufbytes -= bufsize;
1409 #endif
1410 
1411 		if (we->we_bufcount == 0) {
1412 #ifdef WAPBL_DEBUG_BUFBYTES
1413 			KASSERT(we->we_unsynced_bufbytes == 0);
1414 #endif
1415 			pool_put(&wapbl_entry_pool, we);
1416 		}
1417 
1418 		brelse(bp, 0);
1419 		return;
1420 	}
1421 
1422 #ifdef ohbother
1423 	KDASSERT(bp->b_oflags & BO_DONE);
1424 	KDASSERT(!(bp->b_oflags & BO_DELWRI));
1425 	KDASSERT(bp->b_flags & B_ASYNC);
1426 	KDASSERT(bp->b_cflags & BC_BUSY);
1427 	KDASSERT(!(bp->b_flags & B_LOCKED));
1428 	KDASSERT(!(bp->b_flags & B_READ));
1429 	KDASSERT(!(bp->b_cflags & BC_INVAL));
1430 	KDASSERT(!(bp->b_cflags & BC_NOCACHE));
1431 #endif
1432 
1433 	if (bp->b_error) {
1434 		/*
1435 		 * If an error occurs, it would be nice to leave the buffer
1436 		 * as a delayed write on the LRU queue so that we can retry
1437 		 * it later. But buffercache(9) can't handle dirty buffer
1438 		 * reuse, so just mark the log permanently errored out.
1439 		 */
1440 		mutex_enter(&wl->wl_mtx);
1441 		if (wl->wl_error_count == 0) {
1442 			wl->wl_error_count++;
1443 			cv_broadcast(&wl->wl_reclaimable_cv);
1444 		}
1445 		mutex_exit(&wl->wl_mtx);
1446 	}
1447 
1448 	/*
1449 	 * Release the buffer here. wapbl_flush() may wait for the
1450 	 * log to become empty and we better unbusy the buffer before
1451 	 * wapbl_flush() returns.
1452 	 */
1453 	brelse(bp, 0);
1454 
1455 	mutex_enter(&wl->wl_mtx);
1456 
1457 	KASSERT(we->we_bufcount > 0);
1458 	we->we_bufcount--;
1459 #ifdef WAPBL_DEBUG_BUFBYTES
1460 	KASSERT(we->we_unsynced_bufbytes >= bufsize);
1461 	we->we_unsynced_bufbytes -= bufsize;
1462 	KASSERT(wl->wl_unsynced_bufbytes >= bufsize);
1463 	wl->wl_unsynced_bufbytes -= bufsize;
1464 #endif
1465 
1466 	/*
1467 	 * If the current transaction can be reclaimed, start
1468 	 * at the beginning and reclaim any consecutive reclaimable
1469 	 * transactions.  If we successfully reclaim anything,
1470 	 * then wakeup anyone waiting for the reclaim.
1471 	 */
1472 	if (we->we_bufcount == 0) {
1473 		size_t delta = 0;
1474 		int errcnt = 0;
1475 #ifdef WAPBL_DEBUG_BUFBYTES
1476 		KDASSERT(we->we_unsynced_bufbytes == 0);
1477 #endif
1478 		/*
1479 		 * clear any posted error, since the buffer it came from
1480 		 * has successfully flushed by now
1481 		 */
1482 		while ((we = SIMPLEQ_FIRST(&wl->wl_entries)) &&
1483 		       (we->we_bufcount == 0)) {
1484 			delta += we->we_reclaimable_bytes;
1485 			if (we->we_error)
1486 				errcnt++;
1487 			SIMPLEQ_REMOVE_HEAD(&wl->wl_entries, we_entries);
1488 			pool_put(&wapbl_entry_pool, we);
1489 		}
1490 
1491 		if (delta) {
1492 			wl->wl_reclaimable_bytes += delta;
1493 			KASSERT(wl->wl_error_count >= errcnt);
1494 			wl->wl_error_count -= errcnt;
1495 			cv_broadcast(&wl->wl_reclaimable_cv);
1496 		}
1497 	}
1498 
1499 	mutex_exit(&wl->wl_mtx);
1500 }
1501 
1502 /*
1503  * wapbl_flush(wl, wait)
1504  *
1505  *	Flush pending block writes, deallocations, and inodes from
1506  *	the current transaction in memory to the log on disk:
1507  *
1508  *	1. Call the file system's wl_flush callback to flush any
1509  *	   per-file-system pending updates.
1510  *	2. Wait for enough space in the log for the current transaction.
1511  *	3. Synchronously write the new log records, advancing the
1512  *	   circular queue head.
1513  *	4. Issue the pending block writes asynchronously, now that they
1514  *	   are recorded in the log and can be replayed after crash.
1515  *	5. If wait is true, wait for all writes to complete and for the
1516  *	   log to become empty.
1517  *
1518  *	On failure, call the file system's wl_flush_abort callback.
1519  */
1520 int
1521 wapbl_flush(struct wapbl *wl, int waitfor)
1522 {
1523 	struct buf *bp;
1524 	struct wapbl_entry *we;
1525 	off_t off;
1526 	off_t head;
1527 	off_t tail;
1528 	size_t delta = 0;
1529 	size_t flushsize;
1530 	size_t reserved;
1531 	int error = 0;
1532 
1533 	/*
1534 	 * Do a quick check to see if a full flush can be skipped
1535 	 * This assumes that the flush callback does not need to be called
1536 	 * unless there are other outstanding bufs.
1537 	 */
1538 	if (!waitfor) {
1539 		size_t nbufs;
1540 		mutex_enter(&wl->wl_mtx);	/* XXX need mutex here to
1541 						   protect the KASSERTS */
1542 		nbufs = wl->wl_bufcount;
1543 		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bufbytes == 0));
1544 		KASSERT((wl->wl_bufcount == 0) == (wl->wl_bcount == 0));
1545 		mutex_exit(&wl->wl_mtx);
1546 		if (nbufs == 0)
1547 			return 0;
1548 	}
1549 
1550 	/*
1551 	 * XXX we may consider using LK_UPGRADE here
1552 	 * if we want to call flush from inside a transaction
1553 	 */
1554 	rw_enter(&wl->wl_rwlock, RW_WRITER);
1555 	wl->wl_flush(wl->wl_mount, wl->wl_deallocblks, wl->wl_dealloclens,
1556 	    wl->wl_dealloccnt);
1557 
1558 	/*
1559 	 * Now that we are exclusively locked and the file system has
1560 	 * issued any deferred block writes for this transaction, check
1561 	 * whether there are any blocks to write to the log.  If not,
1562 	 * skip waiting for space or writing any log entries.
1563 	 *
1564 	 * XXX Shouldn't this also check wl_dealloccnt and
1565 	 * wl_inohashcnt?  Perhaps wl_dealloccnt doesn't matter if the
1566 	 * file system didn't produce any blocks as a consequence of
1567 	 * it, but the same does not seem to be so of wl_inohashcnt.
1568 	 */
1569 	if (wl->wl_bufcount == 0) {
1570 		goto wait_out;
1571 	}
1572 
1573 #if 0
1574 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1575 		     ("wapbl_flush thread %d.%d flushing entries with "
1576 		      "bufcount=%zu bufbytes=%zu\n",
1577 		      curproc->p_pid, curlwp->l_lid, wl->wl_bufcount,
1578 		      wl->wl_bufbytes));
1579 #endif
1580 
1581 	/* Calculate amount of space needed to flush */
1582 	flushsize = wapbl_transaction_len(wl);
1583 	if (wapbl_verbose_commit) {
1584 		struct timespec ts;
1585 		getnanotime(&ts);
1586 		printf("%s: %lld.%09ld this transaction = %zu bytes\n",
1587 		    __func__, (long long)ts.tv_sec,
1588 		    (long)ts.tv_nsec, flushsize);
1589 	}
1590 
1591 	if (flushsize > (wl->wl_circ_size - wl->wl_reserved_bytes)) {
1592 		/*
1593 		 * XXX this could be handled more gracefully, perhaps place
1594 		 * only a partial transaction in the log and allow the
1595 		 * remaining to flush without the protection of the journal.
1596 		 */
1597 		panic("wapbl_flush: current transaction too big to flush");
1598 	}
1599 
1600 	error = wapbl_truncate(wl, flushsize);
1601 	if (error)
1602 		goto out;
1603 
1604 	off = wl->wl_head;
1605 	KASSERT((off == 0) || (off >= wl->wl_circ_off));
1606 	KASSERT((off == 0) || (off < wl->wl_circ_off + wl->wl_circ_size));
1607 	error = wapbl_write_blocks(wl, &off);
1608 	if (error)
1609 		goto out;
1610 	error = wapbl_write_revocations(wl, &off);
1611 	if (error)
1612 		goto out;
1613 	error = wapbl_write_inodes(wl, &off);
1614 	if (error)
1615 		goto out;
1616 
1617 	reserved = 0;
1618 	if (wl->wl_inohashcnt)
1619 		reserved = wapbl_transaction_inodes_len(wl);
1620 
1621 	head = wl->wl_head;
1622 	tail = wl->wl_tail;
1623 
1624 	wapbl_advance_head(wl->wl_circ_size, wl->wl_circ_off, flushsize,
1625 	    &head, &tail);
1626 
1627 	KASSERTMSG(head == off,
1628 	    "lost head! head=%"PRIdMAX" tail=%" PRIdMAX
1629 	    " off=%"PRIdMAX" flush=%zu",
1630 	    (intmax_t)head, (intmax_t)tail, (intmax_t)off,
1631 	    flushsize);
1632 
1633 	/* Opportunistically move the tail forward if we can */
1634 	mutex_enter(&wl->wl_mtx);
1635 	delta = wl->wl_reclaimable_bytes;
1636 	mutex_exit(&wl->wl_mtx);
1637 	wapbl_advance_tail(wl->wl_circ_size, wl->wl_circ_off, delta,
1638 	    &head, &tail);
1639 
1640 	error = wapbl_write_commit(wl, head, tail);
1641 	if (error)
1642 		goto out;
1643 
1644 	we = pool_get(&wapbl_entry_pool, PR_WAITOK);
1645 
1646 #ifdef WAPBL_DEBUG_BUFBYTES
1647 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1648 		("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
1649 		 " unsynced=%zu"
1650 		 "\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
1651 		 "inodes=%d\n",
1652 		 curproc->p_pid, curlwp->l_lid, flushsize, delta,
1653 		 wapbl_space_used(wl->wl_circ_size, head, tail),
1654 		 wl->wl_unsynced_bufbytes, wl->wl_bufcount,
1655 		 wl->wl_bufbytes, wl->wl_bcount, wl->wl_dealloccnt,
1656 		 wl->wl_inohashcnt));
1657 #else
1658 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1659 		("wapbl_flush: thread %d.%d head+=%zu tail+=%zu used=%zu"
1660 		 "\n\tbufcount=%zu bufbytes=%zu bcount=%zu deallocs=%d "
1661 		 "inodes=%d\n",
1662 		 curproc->p_pid, curlwp->l_lid, flushsize, delta,
1663 		 wapbl_space_used(wl->wl_circ_size, head, tail),
1664 		 wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
1665 		 wl->wl_dealloccnt, wl->wl_inohashcnt));
1666 #endif
1667 
1668 
1669 	mutex_enter(&bufcache_lock);
1670 	mutex_enter(&wl->wl_mtx);
1671 
1672 	wl->wl_reserved_bytes = reserved;
1673 	wl->wl_head = head;
1674 	wl->wl_tail = tail;
1675 	KASSERT(wl->wl_reclaimable_bytes >= delta);
1676 	wl->wl_reclaimable_bytes -= delta;
1677 	wl->wl_dealloccnt = 0;
1678 #ifdef WAPBL_DEBUG_BUFBYTES
1679 	wl->wl_unsynced_bufbytes += wl->wl_bufbytes;
1680 #endif
1681 
1682 	we->we_wapbl = wl;
1683 	we->we_bufcount = wl->wl_bufcount;
1684 #ifdef WAPBL_DEBUG_BUFBYTES
1685 	we->we_unsynced_bufbytes = wl->wl_bufbytes;
1686 #endif
1687 	we->we_reclaimable_bytes = flushsize;
1688 	we->we_error = 0;
1689 	SIMPLEQ_INSERT_TAIL(&wl->wl_entries, we, we_entries);
1690 
1691 	/*
1692 	 * this flushes bufs in reverse order than they were queued
1693 	 * it shouldn't matter, but if we care we could use TAILQ instead.
1694 	 * XXX Note they will get put on the lru queue when they flush
1695 	 * so we might actually want to change this to preserve order.
1696 	 */
1697 	while ((bp = LIST_FIRST(&wl->wl_bufs)) != NULL) {
1698 		if (bbusy(bp, 0, 0, &wl->wl_mtx)) {
1699 			continue;
1700 		}
1701 		bp->b_iodone = wapbl_biodone;
1702 		bp->b_private = we;
1703 		bremfree(bp);
1704 		wapbl_remove_buf_locked(wl, bp);
1705 		mutex_exit(&wl->wl_mtx);
1706 		mutex_exit(&bufcache_lock);
1707 		bawrite(bp);
1708 		mutex_enter(&bufcache_lock);
1709 		mutex_enter(&wl->wl_mtx);
1710 	}
1711 	mutex_exit(&wl->wl_mtx);
1712 	mutex_exit(&bufcache_lock);
1713 
1714 #if 0
1715 	WAPBL_PRINTF(WAPBL_PRINT_FLUSH,
1716 		     ("wapbl_flush thread %d.%d done flushing entries...\n",
1717 		     curproc->p_pid, curlwp->l_lid));
1718 #endif
1719 
1720  wait_out:
1721 
1722 	/*
1723 	 * If the waitfor flag is set, don't return until everything is
1724 	 * fully flushed and the on disk log is empty.
1725 	 */
1726 	if (waitfor) {
1727 		error = wapbl_truncate(wl, wl->wl_circ_size -
1728 			wl->wl_reserved_bytes);
1729 	}
1730 
1731  out:
1732 	if (error) {
1733 		wl->wl_flush_abort(wl->wl_mount, wl->wl_deallocblks,
1734 		    wl->wl_dealloclens, wl->wl_dealloccnt);
1735 	}
1736 
1737 #ifdef WAPBL_DEBUG_PRINT
1738 	if (error) {
1739 		pid_t pid = -1;
1740 		lwpid_t lid = -1;
1741 		if (curproc)
1742 			pid = curproc->p_pid;
1743 		if (curlwp)
1744 			lid = curlwp->l_lid;
1745 		mutex_enter(&wl->wl_mtx);
1746 #ifdef WAPBL_DEBUG_BUFBYTES
1747 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1748 		    ("wapbl_flush: thread %d.%d aborted flush: "
1749 		    "error = %d\n"
1750 		    "\tbufcount=%zu bufbytes=%zu bcount=%zu "
1751 		    "deallocs=%d inodes=%d\n"
1752 		    "\terrcnt = %d, reclaimable=%zu reserved=%zu "
1753 		    "unsynced=%zu\n",
1754 		    pid, lid, error, wl->wl_bufcount,
1755 		    wl->wl_bufbytes, wl->wl_bcount,
1756 		    wl->wl_dealloccnt, wl->wl_inohashcnt,
1757 		    wl->wl_error_count, wl->wl_reclaimable_bytes,
1758 		    wl->wl_reserved_bytes, wl->wl_unsynced_bufbytes));
1759 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
1760 			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1761 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
1762 			     "error = %d, unsynced = %zu\n",
1763 			     we->we_bufcount, we->we_reclaimable_bytes,
1764 			     we->we_error, we->we_unsynced_bufbytes));
1765 		}
1766 #else
1767 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1768 		    ("wapbl_flush: thread %d.%d aborted flush: "
1769 		     "error = %d\n"
1770 		     "\tbufcount=%zu bufbytes=%zu bcount=%zu "
1771 		     "deallocs=%d inodes=%d\n"
1772 		     "\terrcnt = %d, reclaimable=%zu reserved=%zu\n",
1773 		     pid, lid, error, wl->wl_bufcount,
1774 		     wl->wl_bufbytes, wl->wl_bcount,
1775 		     wl->wl_dealloccnt, wl->wl_inohashcnt,
1776 		     wl->wl_error_count, wl->wl_reclaimable_bytes,
1777 		     wl->wl_reserved_bytes));
1778 		SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
1779 			WAPBL_PRINTF(WAPBL_PRINT_ERROR,
1780 			    ("\tentry: bufcount = %zu, reclaimable = %zu, "
1781 			     "error = %d\n", we->we_bufcount,
1782 			     we->we_reclaimable_bytes, we->we_error));
1783 		}
1784 #endif
1785 		mutex_exit(&wl->wl_mtx);
1786 	}
1787 #endif
1788 
1789 	rw_exit(&wl->wl_rwlock);
1790 	return error;
1791 }
1792 
1793 /****************************************************************/
1794 
1795 void
1796 wapbl_jlock_assert(struct wapbl *wl)
1797 {
1798 
1799 	KASSERT(rw_lock_held(&wl->wl_rwlock));
1800 }
1801 
1802 void
1803 wapbl_junlock_assert(struct wapbl *wl)
1804 {
1805 
1806 	KASSERT(!rw_write_held(&wl->wl_rwlock));
1807 }
1808 
1809 /****************************************************************/
1810 
1811 /* locks missing */
1812 void
1813 wapbl_print(struct wapbl *wl,
1814 		int full,
1815 		void (*pr)(const char *, ...))
1816 {
1817 	struct buf *bp;
1818 	struct wapbl_entry *we;
1819 	(*pr)("wapbl %p", wl);
1820 	(*pr)("\nlogvp = %p, devvp = %p, logpbn = %"PRId64"\n",
1821 	      wl->wl_logvp, wl->wl_devvp, wl->wl_logpbn);
1822 	(*pr)("circ = %zu, header = %zu, head = %"PRIdMAX" tail = %"PRIdMAX"\n",
1823 	      wl->wl_circ_size, wl->wl_circ_off,
1824 	      (intmax_t)wl->wl_head, (intmax_t)wl->wl_tail);
1825 	(*pr)("fs_dev_bshift = %d, log_dev_bshift = %d\n",
1826 	      wl->wl_log_dev_bshift, wl->wl_fs_dev_bshift);
1827 #ifdef WAPBL_DEBUG_BUFBYTES
1828 	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
1829 	      "reserved = %zu errcnt = %d unsynced = %zu\n",
1830 	      wl->wl_bufcount, wl->wl_bufbytes, wl->wl_bcount,
1831 	      wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
1832 				wl->wl_error_count, wl->wl_unsynced_bufbytes);
1833 #else
1834 	(*pr)("bufcount = %zu, bufbytes = %zu bcount = %zu reclaimable = %zu "
1835 	      "reserved = %zu errcnt = %d\n", wl->wl_bufcount, wl->wl_bufbytes,
1836 	      wl->wl_bcount, wl->wl_reclaimable_bytes, wl->wl_reserved_bytes,
1837 				wl->wl_error_count);
1838 #endif
1839 	(*pr)("\tdealloccnt = %d, dealloclim = %d\n",
1840 	      wl->wl_dealloccnt, wl->wl_dealloclim);
1841 	(*pr)("\tinohashcnt = %d, inohashmask = 0x%08x\n",
1842 	      wl->wl_inohashcnt, wl->wl_inohashmask);
1843 	(*pr)("entries:\n");
1844 	SIMPLEQ_FOREACH(we, &wl->wl_entries, we_entries) {
1845 #ifdef WAPBL_DEBUG_BUFBYTES
1846 		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d, "
1847 		      "unsynced = %zu\n",
1848 		      we->we_bufcount, we->we_reclaimable_bytes,
1849 		      we->we_error, we->we_unsynced_bufbytes);
1850 #else
1851 		(*pr)("\tbufcount = %zu, reclaimable = %zu, error = %d\n",
1852 		      we->we_bufcount, we->we_reclaimable_bytes, we->we_error);
1853 #endif
1854 	}
1855 	if (full) {
1856 		int cnt = 0;
1857 		(*pr)("bufs =");
1858 		LIST_FOREACH(bp, &wl->wl_bufs, b_wapbllist) {
1859 			if (!LIST_NEXT(bp, b_wapbllist)) {
1860 				(*pr)(" %p", bp);
1861 			} else if ((++cnt % 6) == 0) {
1862 				(*pr)(" %p,\n\t", bp);
1863 			} else {
1864 				(*pr)(" %p,", bp);
1865 			}
1866 		}
1867 		(*pr)("\n");
1868 
1869 		(*pr)("dealloced blks = ");
1870 		{
1871 			int i;
1872 			cnt = 0;
1873 			for (i = 0; i < wl->wl_dealloccnt; i++) {
1874 				(*pr)(" %"PRId64":%d,",
1875 				      wl->wl_deallocblks[i],
1876 				      wl->wl_dealloclens[i]);
1877 				if ((++cnt % 4) == 0) {
1878 					(*pr)("\n\t");
1879 				}
1880 			}
1881 		}
1882 		(*pr)("\n");
1883 
1884 		(*pr)("registered inodes = ");
1885 		{
1886 			int i;
1887 			cnt = 0;
1888 			for (i = 0; i <= wl->wl_inohashmask; i++) {
1889 				struct wapbl_ino_head *wih;
1890 				struct wapbl_ino *wi;
1891 
1892 				wih = &wl->wl_inohash[i];
1893 				LIST_FOREACH(wi, wih, wi_hash) {
1894 					if (wi->wi_ino == 0)
1895 						continue;
1896 					(*pr)(" %"PRIu64"/0%06"PRIo32",",
1897 					    wi->wi_ino, wi->wi_mode);
1898 					if ((++cnt % 4) == 0) {
1899 						(*pr)("\n\t");
1900 					}
1901 				}
1902 			}
1903 			(*pr)("\n");
1904 		}
1905 	}
1906 }
1907 
1908 #if defined(WAPBL_DEBUG) || defined(DDB)
1909 void
1910 wapbl_dump(struct wapbl *wl)
1911 {
1912 #if defined(WAPBL_DEBUG)
1913 	if (!wl)
1914 		wl = wapbl_debug_wl;
1915 #endif
1916 	if (!wl)
1917 		return;
1918 	wapbl_print(wl, 1, printf);
1919 }
1920 #endif
1921 
1922 /****************************************************************/
1923 
1924 void
1925 wapbl_register_deallocation(struct wapbl *wl, daddr_t blk, int len)
1926 {
1927 
1928 	wapbl_jlock_assert(wl);
1929 
1930 	mutex_enter(&wl->wl_mtx);
1931 	/* XXX should eventually instead tie this into resource estimation */
1932 	/*
1933 	 * XXX this panic needs locking/mutex analysis and the
1934 	 * ability to cope with the failure.
1935 	 */
1936 	/* XXX this XXX doesn't have enough XXX */
1937 	if (__predict_false(wl->wl_dealloccnt >= wl->wl_dealloclim))
1938 		panic("wapbl_register_deallocation: out of resources");
1939 
1940 	wl->wl_deallocblks[wl->wl_dealloccnt] = blk;
1941 	wl->wl_dealloclens[wl->wl_dealloccnt] = len;
1942 	wl->wl_dealloccnt++;
1943 	WAPBL_PRINTF(WAPBL_PRINT_ALLOC,
1944 	    ("wapbl_register_deallocation: blk=%"PRId64" len=%d\n", blk, len));
1945 	mutex_exit(&wl->wl_mtx);
1946 }
1947 
1948 /****************************************************************/
1949 
1950 static void
1951 wapbl_inodetrk_init(struct wapbl *wl, u_int size)
1952 {
1953 
1954 	wl->wl_inohash = hashinit(size, HASH_LIST, true, &wl->wl_inohashmask);
1955 	if (atomic_inc_uint_nv(&wapbl_ino_pool_refcount) == 1) {
1956 		pool_init(&wapbl_ino_pool, sizeof(struct wapbl_ino), 0, 0, 0,
1957 		    "wapblinopl", &pool_allocator_nointr, IPL_NONE);
1958 	}
1959 }
1960 
1961 static void
1962 wapbl_inodetrk_free(struct wapbl *wl)
1963 {
1964 
1965 	/* XXX this KASSERT needs locking/mutex analysis */
1966 	KASSERT(wl->wl_inohashcnt == 0);
1967 	hashdone(wl->wl_inohash, HASH_LIST, wl->wl_inohashmask);
1968 	if (atomic_dec_uint_nv(&wapbl_ino_pool_refcount) == 0) {
1969 		pool_destroy(&wapbl_ino_pool);
1970 	}
1971 }
1972 
1973 static struct wapbl_ino *
1974 wapbl_inodetrk_get(struct wapbl *wl, ino_t ino)
1975 {
1976 	struct wapbl_ino_head *wih;
1977 	struct wapbl_ino *wi;
1978 
1979 	KASSERT(mutex_owned(&wl->wl_mtx));
1980 
1981 	wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
1982 	LIST_FOREACH(wi, wih, wi_hash) {
1983 		if (ino == wi->wi_ino)
1984 			return wi;
1985 	}
1986 	return 0;
1987 }
1988 
1989 void
1990 wapbl_register_inode(struct wapbl *wl, ino_t ino, mode_t mode)
1991 {
1992 	struct wapbl_ino_head *wih;
1993 	struct wapbl_ino *wi;
1994 
1995 	wi = pool_get(&wapbl_ino_pool, PR_WAITOK);
1996 
1997 	mutex_enter(&wl->wl_mtx);
1998 	if (wapbl_inodetrk_get(wl, ino) == NULL) {
1999 		wi->wi_ino = ino;
2000 		wi->wi_mode = mode;
2001 		wih = &wl->wl_inohash[ino & wl->wl_inohashmask];
2002 		LIST_INSERT_HEAD(wih, wi, wi_hash);
2003 		wl->wl_inohashcnt++;
2004 		WAPBL_PRINTF(WAPBL_PRINT_INODE,
2005 		    ("wapbl_register_inode: ino=%"PRId64"\n", ino));
2006 		mutex_exit(&wl->wl_mtx);
2007 	} else {
2008 		mutex_exit(&wl->wl_mtx);
2009 		pool_put(&wapbl_ino_pool, wi);
2010 	}
2011 }
2012 
2013 void
2014 wapbl_unregister_inode(struct wapbl *wl, ino_t ino, mode_t mode)
2015 {
2016 	struct wapbl_ino *wi;
2017 
2018 	mutex_enter(&wl->wl_mtx);
2019 	wi = wapbl_inodetrk_get(wl, ino);
2020 	if (wi) {
2021 		WAPBL_PRINTF(WAPBL_PRINT_INODE,
2022 		    ("wapbl_unregister_inode: ino=%"PRId64"\n", ino));
2023 		KASSERT(wl->wl_inohashcnt > 0);
2024 		wl->wl_inohashcnt--;
2025 		LIST_REMOVE(wi, wi_hash);
2026 		mutex_exit(&wl->wl_mtx);
2027 
2028 		pool_put(&wapbl_ino_pool, wi);
2029 	} else {
2030 		mutex_exit(&wl->wl_mtx);
2031 	}
2032 }
2033 
2034 /****************************************************************/
2035 
2036 /*
2037  * wapbl_transaction_inodes_len(wl)
2038  *
2039  *	Calculate the number of bytes required for inode registration
2040  *	log records in wl.
2041  */
2042 static inline size_t
2043 wapbl_transaction_inodes_len(struct wapbl *wl)
2044 {
2045 	int blocklen = 1<<wl->wl_log_dev_bshift;
2046 	int iph;
2047 
2048 	/* Calculate number of inodes described in a inodelist header */
2049 	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
2050 	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
2051 
2052 	KASSERT(iph > 0);
2053 
2054 	return MAX(1, howmany(wl->wl_inohashcnt, iph)) * blocklen;
2055 }
2056 
2057 
2058 /*
2059  * wapbl_transaction_len(wl)
2060  *
2061  *	Calculate number of bytes required for all log records in wl.
2062  */
2063 static size_t
2064 wapbl_transaction_len(struct wapbl *wl)
2065 {
2066 	int blocklen = 1<<wl->wl_log_dev_bshift;
2067 	size_t len;
2068 	int bph;
2069 
2070 	/* Calculate number of blocks described in a blocklist header */
2071 	bph = (blocklen - offsetof(struct wapbl_wc_blocklist, wc_blocks)) /
2072 	    sizeof(((struct wapbl_wc_blocklist *)0)->wc_blocks[0]);
2073 
2074 	KASSERT(bph > 0);
2075 
2076 	len = wl->wl_bcount;
2077 	len += howmany(wl->wl_bufcount, bph) * blocklen;
2078 	len += howmany(wl->wl_dealloccnt, bph) * blocklen;
2079 	len += wapbl_transaction_inodes_len(wl);
2080 
2081 	return len;
2082 }
2083 
2084 /*
2085  * wapbl_cache_sync(wl, msg)
2086  *
2087  *	Issue DIOCCACHESYNC to wl->wl_devvp.
2088  *
2089  *	If sysctl(vfs.wapbl.verbose_commit) >= 2, print a message
2090  *	including msg about the duration of the cache sync.
2091  */
2092 static int
2093 wapbl_cache_sync(struct wapbl *wl, const char *msg)
2094 {
2095 	const bool verbose = wapbl_verbose_commit >= 2;
2096 	struct bintime start_time;
2097 	int force = 1;
2098 	int error;
2099 
2100 	if (!wapbl_flush_disk_cache) {
2101 		return 0;
2102 	}
2103 	if (verbose) {
2104 		bintime(&start_time);
2105 	}
2106 	error = VOP_IOCTL(wl->wl_devvp, DIOCCACHESYNC, &force,
2107 	    FWRITE, FSCRED);
2108 	if (error) {
2109 		WAPBL_PRINTF(WAPBL_PRINT_ERROR,
2110 		    ("wapbl_cache_sync: DIOCCACHESYNC on dev 0x%jx "
2111 		    "returned %d\n", (uintmax_t)wl->wl_devvp->v_rdev, error));
2112 	}
2113 	if (verbose) {
2114 		struct bintime d;
2115 		struct timespec ts;
2116 
2117 		bintime(&d);
2118 		bintime_sub(&d, &start_time);
2119 		bintime2timespec(&d, &ts);
2120 		printf("wapbl_cache_sync: %s: dev 0x%jx %ju.%09lu\n",
2121 		    msg, (uintmax_t)wl->wl_devvp->v_rdev,
2122 		    (uintmax_t)ts.tv_sec, ts.tv_nsec);
2123 	}
2124 	return error;
2125 }
2126 
2127 /*
2128  * wapbl_write_commit(wl, head, tail)
2129  *
2130  *	Issue a disk cache sync to wait for all pending writes to the
2131  *	log to complete, and then synchronously commit the current
2132  *	circular queue head and tail to the log, in the next of two
2133  *	locations for commit headers on disk.
2134  *
2135  *	Increment the generation number.  If the generation number
2136  *	rolls over to zero, then a subsequent commit would appear to
2137  *	have an older generation than this one -- in that case, issue a
2138  *	duplicate commit to avoid this.
2139  *
2140  *	=> Caller must have exclusive access to wl, either by holding
2141  *	wl->wl_rwlock for writer or by being wapbl_start before anyone
2142  *	else has seen wl.
2143  */
2144 static int
2145 wapbl_write_commit(struct wapbl *wl, off_t head, off_t tail)
2146 {
2147 	struct wapbl_wc_header *wc = wl->wl_wc_header;
2148 	struct timespec ts;
2149 	int error;
2150 	daddr_t pbn;
2151 
2152 	error = wapbl_buffered_flush(wl);
2153 	if (error)
2154 		return error;
2155 	/*
2156 	 * flush disk cache to ensure that blocks we've written are actually
2157 	 * written to the stable storage before the commit header.
2158 	 *
2159 	 * XXX Calc checksum here, instead we do this for now
2160 	 */
2161 	wapbl_cache_sync(wl, "1");
2162 
2163 	wc->wc_head = head;
2164 	wc->wc_tail = tail;
2165 	wc->wc_checksum = 0;
2166 	wc->wc_version = 1;
2167 	getnanotime(&ts);
2168 	wc->wc_time = ts.tv_sec;
2169 	wc->wc_timensec = ts.tv_nsec;
2170 
2171 	WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2172 	    ("wapbl_write_commit: head = %"PRIdMAX "tail = %"PRIdMAX"\n",
2173 	    (intmax_t)head, (intmax_t)tail));
2174 
2175 	/*
2176 	 * write the commit header.
2177 	 *
2178 	 * XXX if generation will rollover, then first zero
2179 	 * over second commit header before trying to write both headers.
2180 	 */
2181 
2182 	pbn = wl->wl_logpbn + (wc->wc_generation % 2);
2183 #ifdef _KERNEL
2184 	pbn = btodb(pbn << wc->wc_log_dev_bshift);
2185 #endif
2186 	error = wapbl_buffered_write(wc, wc->wc_len, wl, pbn);
2187 	if (error)
2188 		return error;
2189 	error = wapbl_buffered_flush(wl);
2190 	if (error)
2191 		return error;
2192 
2193 	/*
2194 	 * flush disk cache to ensure that the commit header is actually
2195 	 * written before meta data blocks.
2196 	 */
2197 	wapbl_cache_sync(wl, "2");
2198 
2199 	/*
2200 	 * If the generation number was zero, write it out a second time.
2201 	 * This handles initialization and generation number rollover
2202 	 */
2203 	if (wc->wc_generation++ == 0) {
2204 		error = wapbl_write_commit(wl, head, tail);
2205 		/*
2206 		 * This panic should be able to be removed if we do the
2207 		 * zero'ing mentioned above, and we are certain to roll
2208 		 * back generation number on failure.
2209 		 */
2210 		if (error)
2211 			panic("wapbl_write_commit: error writing duplicate "
2212 			      "log header: %d", error);
2213 	}
2214 	return 0;
2215 }
2216 
2217 /*
2218  * wapbl_write_blocks(wl, offp)
2219  *
2220  *	Write all pending physical blocks in the current transaction
2221  *	from wapbl_add_buf to the log on disk, adding to the circular
2222  *	queue head at byte offset *offp, and returning the new head's
2223  *	byte offset in *offp.
2224  */
2225 static int
2226 wapbl_write_blocks(struct wapbl *wl, off_t *offp)
2227 {
2228 	struct wapbl_wc_blocklist *wc =
2229 	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
2230 	int blocklen = 1<<wl->wl_log_dev_bshift;
2231 	int bph;
2232 	struct buf *bp;
2233 	off_t off = *offp;
2234 	int error;
2235 	size_t padding;
2236 
2237 	KASSERT(rw_write_held(&wl->wl_rwlock));
2238 
2239 	bph = (blocklen - offsetof(struct wapbl_wc_blocklist, wc_blocks)) /
2240 	    sizeof(((struct wapbl_wc_blocklist *)0)->wc_blocks[0]);
2241 
2242 	bp = LIST_FIRST(&wl->wl_bufs);
2243 
2244 	while (bp) {
2245 		int cnt;
2246 		struct buf *obp = bp;
2247 
2248 		KASSERT(bp->b_flags & B_LOCKED);
2249 
2250 		wc->wc_type = WAPBL_WC_BLOCKS;
2251 		wc->wc_len = blocklen;
2252 		wc->wc_blkcount = 0;
2253 		while (bp && (wc->wc_blkcount < bph)) {
2254 			/*
2255 			 * Make sure all the physical block numbers are up to
2256 			 * date.  If this is not always true on a given
2257 			 * filesystem, then VOP_BMAP must be called.  We
2258 			 * could call VOP_BMAP here, or else in the filesystem
2259 			 * specific flush callback, although neither of those
2260 			 * solutions allow us to take the vnode lock.  If a
2261 			 * filesystem requires that we must take the vnode lock
2262 			 * to call VOP_BMAP, then we can probably do it in
2263 			 * bwrite when the vnode lock should already be held
2264 			 * by the invoking code.
2265 			 */
2266 			KASSERT((bp->b_vp->v_type == VBLK) ||
2267 				 (bp->b_blkno != bp->b_lblkno));
2268 			KASSERT(bp->b_blkno > 0);
2269 
2270 			wc->wc_blocks[wc->wc_blkcount].wc_daddr = bp->b_blkno;
2271 			wc->wc_blocks[wc->wc_blkcount].wc_dlen = bp->b_bcount;
2272 			wc->wc_len += bp->b_bcount;
2273 			wc->wc_blkcount++;
2274 			bp = LIST_NEXT(bp, b_wapbllist);
2275 		}
2276 		if (wc->wc_len % blocklen != 0) {
2277 			padding = blocklen - wc->wc_len % blocklen;
2278 			wc->wc_len += padding;
2279 		} else {
2280 			padding = 0;
2281 		}
2282 
2283 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2284 		    ("wapbl_write_blocks: len = %u (padding %zu) off = %"PRIdMAX"\n",
2285 		    wc->wc_len, padding, (intmax_t)off));
2286 
2287 		error = wapbl_circ_write(wl, wc, blocklen, &off);
2288 		if (error)
2289 			return error;
2290 		bp = obp;
2291 		cnt = 0;
2292 		while (bp && (cnt++ < bph)) {
2293 			error = wapbl_circ_write(wl, bp->b_data,
2294 			    bp->b_bcount, &off);
2295 			if (error)
2296 				return error;
2297 			bp = LIST_NEXT(bp, b_wapbllist);
2298 		}
2299 		if (padding) {
2300 			void *zero;
2301 
2302 			zero = wapbl_alloc(padding);
2303 			memset(zero, 0, padding);
2304 			error = wapbl_circ_write(wl, zero, padding, &off);
2305 			wapbl_free(zero, padding);
2306 			if (error)
2307 				return error;
2308 		}
2309 	}
2310 	*offp = off;
2311 	return 0;
2312 }
2313 
2314 /*
2315  * wapbl_write_revocations(wl, offp)
2316  *
2317  *	Write all pending deallocations in the current transaction from
2318  *	wapbl_register_deallocation to the log on disk, adding to the
2319  *	circular queue's head at byte offset *offp, and returning the
2320  *	new head's byte offset in *offp.
2321  */
2322 static int
2323 wapbl_write_revocations(struct wapbl *wl, off_t *offp)
2324 {
2325 	struct wapbl_wc_blocklist *wc =
2326 	    (struct wapbl_wc_blocklist *)wl->wl_wc_scratch;
2327 	int i;
2328 	int blocklen = 1<<wl->wl_log_dev_bshift;
2329 	int bph;
2330 	off_t off = *offp;
2331 	int error;
2332 
2333 	if (wl->wl_dealloccnt == 0)
2334 		return 0;
2335 
2336 	bph = (blocklen - offsetof(struct wapbl_wc_blocklist, wc_blocks)) /
2337 	    sizeof(((struct wapbl_wc_blocklist *)0)->wc_blocks[0]);
2338 
2339 	i = 0;
2340 	while (i < wl->wl_dealloccnt) {
2341 		wc->wc_type = WAPBL_WC_REVOCATIONS;
2342 		wc->wc_len = blocklen;
2343 		wc->wc_blkcount = 0;
2344 		while ((i < wl->wl_dealloccnt) && (wc->wc_blkcount < bph)) {
2345 			wc->wc_blocks[wc->wc_blkcount].wc_daddr =
2346 			    wl->wl_deallocblks[i];
2347 			wc->wc_blocks[wc->wc_blkcount].wc_dlen =
2348 			    wl->wl_dealloclens[i];
2349 			wc->wc_blkcount++;
2350 			i++;
2351 		}
2352 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2353 		    ("wapbl_write_revocations: len = %u off = %"PRIdMAX"\n",
2354 		    wc->wc_len, (intmax_t)off));
2355 		error = wapbl_circ_write(wl, wc, blocklen, &off);
2356 		if (error)
2357 			return error;
2358 	}
2359 	*offp = off;
2360 	return 0;
2361 }
2362 
2363 /*
2364  * wapbl_write_inodes(wl, offp)
2365  *
2366  *	Write all pending inode allocations in the current transaction
2367  *	from wapbl_register_inode to the log on disk, adding to the
2368  *	circular queue's head at byte offset *offp and returning the
2369  *	new head's byte offset in *offp.
2370  */
2371 static int
2372 wapbl_write_inodes(struct wapbl *wl, off_t *offp)
2373 {
2374 	struct wapbl_wc_inodelist *wc =
2375 	    (struct wapbl_wc_inodelist *)wl->wl_wc_scratch;
2376 	int i;
2377 	int blocklen = 1 << wl->wl_log_dev_bshift;
2378 	off_t off = *offp;
2379 	int error;
2380 
2381 	struct wapbl_ino_head *wih;
2382 	struct wapbl_ino *wi;
2383 	int iph;
2384 
2385 	iph = (blocklen - offsetof(struct wapbl_wc_inodelist, wc_inodes)) /
2386 	    sizeof(((struct wapbl_wc_inodelist *)0)->wc_inodes[0]);
2387 
2388 	i = 0;
2389 	wih = &wl->wl_inohash[0];
2390 	wi = 0;
2391 	do {
2392 		wc->wc_type = WAPBL_WC_INODES;
2393 		wc->wc_len = blocklen;
2394 		wc->wc_inocnt = 0;
2395 		wc->wc_clear = (i == 0);
2396 		while ((i < wl->wl_inohashcnt) && (wc->wc_inocnt < iph)) {
2397 			while (!wi) {
2398 				KASSERT((wih - &wl->wl_inohash[0])
2399 				    <= wl->wl_inohashmask);
2400 				wi = LIST_FIRST(wih++);
2401 			}
2402 			wc->wc_inodes[wc->wc_inocnt].wc_inumber = wi->wi_ino;
2403 			wc->wc_inodes[wc->wc_inocnt].wc_imode = wi->wi_mode;
2404 			wc->wc_inocnt++;
2405 			i++;
2406 			wi = LIST_NEXT(wi, wi_hash);
2407 		}
2408 		WAPBL_PRINTF(WAPBL_PRINT_WRITE,
2409 		    ("wapbl_write_inodes: len = %u off = %"PRIdMAX"\n",
2410 		    wc->wc_len, (intmax_t)off));
2411 		error = wapbl_circ_write(wl, wc, blocklen, &off);
2412 		if (error)
2413 			return error;
2414 	} while (i < wl->wl_inohashcnt);
2415 
2416 	*offp = off;
2417 	return 0;
2418 }
2419 
2420 #endif /* _KERNEL */
2421 
2422 /****************************************************************/
2423 
2424 struct wapbl_blk {
2425 	LIST_ENTRY(wapbl_blk) wb_hash;
2426 	daddr_t wb_blk;
2427 	off_t wb_off; /* Offset of this block in the log */
2428 };
2429 #define	WAPBL_BLKPOOL_MIN 83
2430 
2431 static void
2432 wapbl_blkhash_init(struct wapbl_replay *wr, u_int size)
2433 {
2434 	if (size < WAPBL_BLKPOOL_MIN)
2435 		size = WAPBL_BLKPOOL_MIN;
2436 	KASSERT(wr->wr_blkhash == 0);
2437 #ifdef _KERNEL
2438 	wr->wr_blkhash = hashinit(size, HASH_LIST, true, &wr->wr_blkhashmask);
2439 #else /* ! _KERNEL */
2440 	/* Manually implement hashinit */
2441 	{
2442 		unsigned long i, hashsize;
2443 		for (hashsize = 1; hashsize < size; hashsize <<= 1)
2444 			continue;
2445 		wr->wr_blkhash = wapbl_alloc(hashsize * sizeof(*wr->wr_blkhash));
2446 		for (i = 0; i < hashsize; i++)
2447 			LIST_INIT(&wr->wr_blkhash[i]);
2448 		wr->wr_blkhashmask = hashsize - 1;
2449 	}
2450 #endif /* ! _KERNEL */
2451 }
2452 
2453 static void
2454 wapbl_blkhash_free(struct wapbl_replay *wr)
2455 {
2456 	KASSERT(wr->wr_blkhashcnt == 0);
2457 #ifdef _KERNEL
2458 	hashdone(wr->wr_blkhash, HASH_LIST, wr->wr_blkhashmask);
2459 #else /* ! _KERNEL */
2460 	wapbl_free(wr->wr_blkhash,
2461 	    (wr->wr_blkhashmask + 1) * sizeof(*wr->wr_blkhash));
2462 #endif /* ! _KERNEL */
2463 }
2464 
2465 static struct wapbl_blk *
2466 wapbl_blkhash_get(struct wapbl_replay *wr, daddr_t blk)
2467 {
2468 	struct wapbl_blk_head *wbh;
2469 	struct wapbl_blk *wb;
2470 	wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
2471 	LIST_FOREACH(wb, wbh, wb_hash) {
2472 		if (blk == wb->wb_blk)
2473 			return wb;
2474 	}
2475 	return 0;
2476 }
2477 
2478 static void
2479 wapbl_blkhash_ins(struct wapbl_replay *wr, daddr_t blk, off_t off)
2480 {
2481 	struct wapbl_blk_head *wbh;
2482 	struct wapbl_blk *wb;
2483 	wb = wapbl_blkhash_get(wr, blk);
2484 	if (wb) {
2485 		KASSERT(wb->wb_blk == blk);
2486 		wb->wb_off = off;
2487 	} else {
2488 		wb = wapbl_alloc(sizeof(*wb));
2489 		wb->wb_blk = blk;
2490 		wb->wb_off = off;
2491 		wbh = &wr->wr_blkhash[blk & wr->wr_blkhashmask];
2492 		LIST_INSERT_HEAD(wbh, wb, wb_hash);
2493 		wr->wr_blkhashcnt++;
2494 	}
2495 }
2496 
2497 static void
2498 wapbl_blkhash_rem(struct wapbl_replay *wr, daddr_t blk)
2499 {
2500 	struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
2501 	if (wb) {
2502 		KASSERT(wr->wr_blkhashcnt > 0);
2503 		wr->wr_blkhashcnt--;
2504 		LIST_REMOVE(wb, wb_hash);
2505 		wapbl_free(wb, sizeof(*wb));
2506 	}
2507 }
2508 
2509 static void
2510 wapbl_blkhash_clear(struct wapbl_replay *wr)
2511 {
2512 	unsigned long i;
2513 	for (i = 0; i <= wr->wr_blkhashmask; i++) {
2514 		struct wapbl_blk *wb;
2515 
2516 		while ((wb = LIST_FIRST(&wr->wr_blkhash[i]))) {
2517 			KASSERT(wr->wr_blkhashcnt > 0);
2518 			wr->wr_blkhashcnt--;
2519 			LIST_REMOVE(wb, wb_hash);
2520 			wapbl_free(wb, sizeof(*wb));
2521 		}
2522 	}
2523 	KASSERT(wr->wr_blkhashcnt == 0);
2524 }
2525 
2526 /****************************************************************/
2527 
2528 /*
2529  * wapbl_circ_read(wr, data, len, offp)
2530  *
2531  *	Read len bytes into data from the circular queue of wr,
2532  *	starting at the linear byte offset *offp, and returning the new
2533  *	linear byte offset in *offp.
2534  *
2535  *	If the starting linear byte offset precedes wr->wr_circ_off,
2536  *	the read instead begins at wr->wr_circ_off.  XXX WTF?  This
2537  *	should be a KASSERT, not a conditional.
2538  */
2539 static int
2540 wapbl_circ_read(struct wapbl_replay *wr, void *data, size_t len, off_t *offp)
2541 {
2542 	size_t slen;
2543 	off_t off = *offp;
2544 	int error;
2545 	daddr_t pbn;
2546 
2547 	KASSERT(((len >> wr->wr_log_dev_bshift) <<
2548 	    wr->wr_log_dev_bshift) == len);
2549 
2550 	if (off < wr->wr_circ_off)
2551 		off = wr->wr_circ_off;
2552 	slen = wr->wr_circ_off + wr->wr_circ_size - off;
2553 	if (slen < len) {
2554 		pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
2555 #ifdef _KERNEL
2556 		pbn = btodb(pbn << wr->wr_log_dev_bshift);
2557 #endif
2558 		error = wapbl_read(data, slen, wr->wr_devvp, pbn);
2559 		if (error)
2560 			return error;
2561 		data = (uint8_t *)data + slen;
2562 		len -= slen;
2563 		off = wr->wr_circ_off;
2564 	}
2565 	pbn = wr->wr_logpbn + (off >> wr->wr_log_dev_bshift);
2566 #ifdef _KERNEL
2567 	pbn = btodb(pbn << wr->wr_log_dev_bshift);
2568 #endif
2569 	error = wapbl_read(data, len, wr->wr_devvp, pbn);
2570 	if (error)
2571 		return error;
2572 	off += len;
2573 	if (off >= wr->wr_circ_off + wr->wr_circ_size)
2574 		off = wr->wr_circ_off;
2575 	*offp = off;
2576 	return 0;
2577 }
2578 
2579 /*
2580  * wapbl_circ_advance(wr, len, offp)
2581  *
2582  *	Compute the linear byte offset of the circular queue of wr that
2583  *	is len bytes past *offp, and store it in *offp.
2584  *
2585  *	This is as if wapbl_circ_read, but without actually reading
2586  *	anything.
2587  *
2588  *	If the starting linear byte offset precedes wr->wr_circ_off, it
2589  *	is taken to be wr->wr_circ_off instead.  XXX WTF?  This should
2590  *	be a KASSERT, not a conditional.
2591  */
2592 static void
2593 wapbl_circ_advance(struct wapbl_replay *wr, size_t len, off_t *offp)
2594 {
2595 	size_t slen;
2596 	off_t off = *offp;
2597 
2598 	KASSERT(((len >> wr->wr_log_dev_bshift) <<
2599 	    wr->wr_log_dev_bshift) == len);
2600 
2601 	if (off < wr->wr_circ_off)
2602 		off = wr->wr_circ_off;
2603 	slen = wr->wr_circ_off + wr->wr_circ_size - off;
2604 	if (slen < len) {
2605 		len -= slen;
2606 		off = wr->wr_circ_off;
2607 	}
2608 	off += len;
2609 	if (off >= wr->wr_circ_off + wr->wr_circ_size)
2610 		off = wr->wr_circ_off;
2611 	*offp = off;
2612 }
2613 
2614 /****************************************************************/
2615 
2616 int
2617 wapbl_replay_start(struct wapbl_replay **wrp, struct vnode *vp,
2618 	daddr_t off, size_t count, size_t blksize)
2619 {
2620 	struct wapbl_replay *wr;
2621 	int error;
2622 	struct vnode *devvp;
2623 	daddr_t logpbn;
2624 	uint8_t *scratch;
2625 	struct wapbl_wc_header *wch;
2626 	struct wapbl_wc_header *wch2;
2627 	/* Use this until we read the actual log header */
2628 	int log_dev_bshift = ilog2(blksize);
2629 	size_t used;
2630 	daddr_t pbn;
2631 
2632 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
2633 	    ("wapbl_replay_start: vp=%p off=%"PRId64 " count=%zu blksize=%zu\n",
2634 	    vp, off, count, blksize));
2635 
2636 	if (off < 0)
2637 		return EINVAL;
2638 
2639 	if (blksize < DEV_BSIZE)
2640 		return EINVAL;
2641 	if (blksize % DEV_BSIZE)
2642 		return EINVAL;
2643 
2644 #ifdef _KERNEL
2645 #if 0
2646 	/* XXX vp->v_size isn't reliably set for VBLK devices,
2647 	 * especially root.  However, we might still want to verify
2648 	 * that the full load is readable */
2649 	if ((off + count) * blksize > vp->v_size)
2650 		return EINVAL;
2651 #endif
2652 	if ((error = VOP_BMAP(vp, off, &devvp, &logpbn, 0)) != 0) {
2653 		return error;
2654 	}
2655 #else /* ! _KERNEL */
2656 	devvp = vp;
2657 	logpbn = off;
2658 #endif /* ! _KERNEL */
2659 
2660 	scratch = wapbl_alloc(MAXBSIZE);
2661 
2662 	pbn = logpbn;
2663 #ifdef _KERNEL
2664 	pbn = btodb(pbn << log_dev_bshift);
2665 #endif
2666 	error = wapbl_read(scratch, 2<<log_dev_bshift, devvp, pbn);
2667 	if (error)
2668 		goto errout;
2669 
2670 	wch = (struct wapbl_wc_header *)scratch;
2671 	wch2 =
2672 	    (struct wapbl_wc_header *)(scratch + (1<<log_dev_bshift));
2673 	/* XXX verify checksums and magic numbers */
2674 	if (wch->wc_type != WAPBL_WC_HEADER) {
2675 		printf("Unrecognized wapbl magic: 0x%08x\n", wch->wc_type);
2676 		error = EFTYPE;
2677 		goto errout;
2678 	}
2679 
2680 	if (wch2->wc_generation > wch->wc_generation)
2681 		wch = wch2;
2682 
2683 	wr = wapbl_calloc(1, sizeof(*wr));
2684 
2685 	wr->wr_logvp = vp;
2686 	wr->wr_devvp = devvp;
2687 	wr->wr_logpbn = logpbn;
2688 
2689 	wr->wr_scratch = scratch;
2690 
2691 	wr->wr_log_dev_bshift = wch->wc_log_dev_bshift;
2692 	wr->wr_fs_dev_bshift = wch->wc_fs_dev_bshift;
2693 	wr->wr_circ_off = wch->wc_circ_off;
2694 	wr->wr_circ_size = wch->wc_circ_size;
2695 	wr->wr_generation = wch->wc_generation;
2696 
2697 	used = wapbl_space_used(wch->wc_circ_size, wch->wc_head, wch->wc_tail);
2698 
2699 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY,
2700 	    ("wapbl_replay: head=%"PRId64" tail=%"PRId64" off=%"PRId64
2701 	    " len=%"PRId64" used=%zu\n",
2702 	    wch->wc_head, wch->wc_tail, wch->wc_circ_off,
2703 	    wch->wc_circ_size, used));
2704 
2705 	wapbl_blkhash_init(wr, (used >> wch->wc_fs_dev_bshift));
2706 
2707 	error = wapbl_replay_process(wr, wch->wc_head, wch->wc_tail);
2708 	if (error) {
2709 		wapbl_replay_stop(wr);
2710 		wapbl_replay_free(wr);
2711 		return error;
2712 	}
2713 
2714 	*wrp = wr;
2715 	return 0;
2716 
2717  errout:
2718 	wapbl_free(scratch, MAXBSIZE);
2719 	return error;
2720 }
2721 
2722 void
2723 wapbl_replay_stop(struct wapbl_replay *wr)
2724 {
2725 
2726 	if (!wapbl_replay_isopen(wr))
2727 		return;
2728 
2729 	WAPBL_PRINTF(WAPBL_PRINT_REPLAY, ("wapbl_replay_stop called\n"));
2730 
2731 	wapbl_free(wr->wr_scratch, MAXBSIZE);
2732 	wr->wr_scratch = NULL;
2733 
2734 	wr->wr_logvp = NULL;
2735 
2736 	wapbl_blkhash_clear(wr);
2737 	wapbl_blkhash_free(wr);
2738 }
2739 
2740 void
2741 wapbl_replay_free(struct wapbl_replay *wr)
2742 {
2743 
2744 	KDASSERT(!wapbl_replay_isopen(wr));
2745 
2746 	if (wr->wr_inodes)
2747 		wapbl_free(wr->wr_inodes,
2748 		    wr->wr_inodescnt * sizeof(wr->wr_inodes[0]));
2749 	wapbl_free(wr, sizeof(*wr));
2750 }
2751 
2752 #ifdef _KERNEL
2753 int
2754 wapbl_replay_isopen1(struct wapbl_replay *wr)
2755 {
2756 
2757 	return wapbl_replay_isopen(wr);
2758 }
2759 #endif
2760 
2761 /*
2762  * calculate the disk address for the i'th block in the wc_blockblist
2763  * offset by j blocks of size blen.
2764  *
2765  * wc_daddr is always a kernel disk address in DEV_BSIZE units that
2766  * was written to the journal.
2767  *
2768  * The kernel needs that address plus the offset in DEV_BSIZE units.
2769  *
2770  * Userland needs that address plus the offset in blen units.
2771  *
2772  */
2773 static daddr_t
2774 wapbl_block_daddr(struct wapbl_wc_blocklist *wc, int i, int j, int blen)
2775 {
2776 	daddr_t pbn;
2777 
2778 #ifdef _KERNEL
2779 	pbn = wc->wc_blocks[i].wc_daddr + btodb(j * blen);
2780 #else
2781 	pbn = dbtob(wc->wc_blocks[i].wc_daddr) / blen + j;
2782 #endif
2783 
2784 	return pbn;
2785 }
2786 
2787 static void
2788 wapbl_replay_process_blocks(struct wapbl_replay *wr, off_t *offp)
2789 {
2790 	struct wapbl_wc_blocklist *wc =
2791 	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
2792 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
2793 	int i, j, n;
2794 
2795 	for (i = 0; i < wc->wc_blkcount; i++) {
2796 		/*
2797 		 * Enter each physical block into the hashtable independently.
2798 		 */
2799 		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
2800 		for (j = 0; j < n; j++) {
2801 			wapbl_blkhash_ins(wr, wapbl_block_daddr(wc, i, j, fsblklen),
2802 			    *offp);
2803 			wapbl_circ_advance(wr, fsblklen, offp);
2804 		}
2805 	}
2806 }
2807 
2808 static void
2809 wapbl_replay_process_revocations(struct wapbl_replay *wr)
2810 {
2811 	struct wapbl_wc_blocklist *wc =
2812 	    (struct wapbl_wc_blocklist *)wr->wr_scratch;
2813 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
2814 	int i, j, n;
2815 
2816 	for (i = 0; i < wc->wc_blkcount; i++) {
2817 		/*
2818 		 * Remove any blocks found from the hashtable.
2819 		 */
2820 		n = wc->wc_blocks[i].wc_dlen >> wr->wr_fs_dev_bshift;
2821 		for (j = 0; j < n; j++)
2822 			wapbl_blkhash_rem(wr, wapbl_block_daddr(wc, i, j, fsblklen));
2823 	}
2824 }
2825 
2826 static void
2827 wapbl_replay_process_inodes(struct wapbl_replay *wr, off_t oldoff, off_t newoff)
2828 {
2829 	struct wapbl_wc_inodelist *wc =
2830 	    (struct wapbl_wc_inodelist *)wr->wr_scratch;
2831 	void *new_inodes;
2832 	const size_t oldsize = wr->wr_inodescnt * sizeof(wr->wr_inodes[0]);
2833 
2834 	KASSERT(sizeof(wr->wr_inodes[0]) == sizeof(wc->wc_inodes[0]));
2835 
2836 	/*
2837 	 * Keep track of where we found this so location won't be
2838 	 * overwritten.
2839 	 */
2840 	if (wc->wc_clear) {
2841 		wr->wr_inodestail = oldoff;
2842 		wr->wr_inodescnt = 0;
2843 		if (wr->wr_inodes != NULL) {
2844 			wapbl_free(wr->wr_inodes, oldsize);
2845 			wr->wr_inodes = NULL;
2846 		}
2847 	}
2848 	wr->wr_inodeshead = newoff;
2849 	if (wc->wc_inocnt == 0)
2850 		return;
2851 
2852 	new_inodes = wapbl_alloc((wr->wr_inodescnt + wc->wc_inocnt) *
2853 	    sizeof(wr->wr_inodes[0]));
2854 	if (wr->wr_inodes != NULL) {
2855 		memcpy(new_inodes, wr->wr_inodes, oldsize);
2856 		wapbl_free(wr->wr_inodes, oldsize);
2857 	}
2858 	wr->wr_inodes = new_inodes;
2859 	memcpy(&wr->wr_inodes[wr->wr_inodescnt], wc->wc_inodes,
2860 	    wc->wc_inocnt * sizeof(wr->wr_inodes[0]));
2861 	wr->wr_inodescnt += wc->wc_inocnt;
2862 }
2863 
2864 static int
2865 wapbl_replay_process(struct wapbl_replay *wr, off_t head, off_t tail)
2866 {
2867 	off_t off;
2868 	int error;
2869 
2870 	int logblklen = 1 << wr->wr_log_dev_bshift;
2871 
2872 	wapbl_blkhash_clear(wr);
2873 
2874 	off = tail;
2875 	while (off != head) {
2876 		struct wapbl_wc_null *wcn;
2877 		off_t saveoff = off;
2878 		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
2879 		if (error)
2880 			goto errout;
2881 		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
2882 		switch (wcn->wc_type) {
2883 		case WAPBL_WC_BLOCKS:
2884 			wapbl_replay_process_blocks(wr, &off);
2885 			break;
2886 
2887 		case WAPBL_WC_REVOCATIONS:
2888 			wapbl_replay_process_revocations(wr);
2889 			break;
2890 
2891 		case WAPBL_WC_INODES:
2892 			wapbl_replay_process_inodes(wr, saveoff, off);
2893 			break;
2894 
2895 		default:
2896 			printf("Unrecognized wapbl type: 0x%08x\n",
2897 			       wcn->wc_type);
2898  			error = EFTYPE;
2899 			goto errout;
2900 		}
2901 		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
2902 		if (off != saveoff) {
2903 			printf("wapbl_replay: corrupted records\n");
2904 			error = EFTYPE;
2905 			goto errout;
2906 		}
2907 	}
2908 	return 0;
2909 
2910  errout:
2911 	wapbl_blkhash_clear(wr);
2912 	return error;
2913 }
2914 
2915 #if 0
2916 int
2917 wapbl_replay_verify(struct wapbl_replay *wr, struct vnode *fsdevvp)
2918 {
2919 	off_t off;
2920 	int mismatchcnt = 0;
2921 	int logblklen = 1 << wr->wr_log_dev_bshift;
2922 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
2923 	void *scratch1 = wapbl_alloc(MAXBSIZE);
2924 	void *scratch2 = wapbl_alloc(MAXBSIZE);
2925 	int error = 0;
2926 
2927 	KDASSERT(wapbl_replay_isopen(wr));
2928 
2929 	off = wch->wc_tail;
2930 	while (off != wch->wc_head) {
2931 		struct wapbl_wc_null *wcn;
2932 #ifdef DEBUG
2933 		off_t saveoff = off;
2934 #endif
2935 		error = wapbl_circ_read(wr, wr->wr_scratch, logblklen, &off);
2936 		if (error)
2937 			goto out;
2938 		wcn = (struct wapbl_wc_null *)wr->wr_scratch;
2939 		switch (wcn->wc_type) {
2940 		case WAPBL_WC_BLOCKS:
2941 			{
2942 				struct wapbl_wc_blocklist *wc =
2943 				    (struct wapbl_wc_blocklist *)wr->wr_scratch;
2944 				int i;
2945 				for (i = 0; i < wc->wc_blkcount; i++) {
2946 					int foundcnt = 0;
2947 					int dirtycnt = 0;
2948 					int j, n;
2949 					/*
2950 					 * Check each physical block into the
2951 					 * hashtable independently
2952 					 */
2953 					n = wc->wc_blocks[i].wc_dlen >>
2954 					    wch->wc_fs_dev_bshift;
2955 					for (j = 0; j < n; j++) {
2956 						struct wapbl_blk *wb =
2957 						   wapbl_blkhash_get(wr,
2958 						   wapbl_block_daddr(wc, i, j, fsblklen));
2959 						if (wb && (wb->wb_off == off)) {
2960 							foundcnt++;
2961 							error =
2962 							    wapbl_circ_read(wr,
2963 							    scratch1, fsblklen,
2964 							    &off);
2965 							if (error)
2966 								goto out;
2967 							error =
2968 							    wapbl_read(scratch2,
2969 							    fsblklen, fsdevvp,
2970 							    wb->wb_blk);
2971 							if (error)
2972 								goto out;
2973 							if (memcmp(scratch1,
2974 								   scratch2,
2975 								   fsblklen)) {
2976 								printf(
2977 		"wapbl_verify: mismatch block %"PRId64" at off %"PRIdMAX"\n",
2978 		wb->wb_blk, (intmax_t)off);
2979 								dirtycnt++;
2980 								mismatchcnt++;
2981 							}
2982 						} else {
2983 							wapbl_circ_advance(wr,
2984 							    fsblklen, &off);
2985 						}
2986 					}
2987 #if 0
2988 					/*
2989 					 * If all of the blocks in an entry
2990 					 * are clean, then remove all of its
2991 					 * blocks from the hashtable since they
2992 					 * never will need replay.
2993 					 */
2994 					if ((foundcnt != 0) &&
2995 					    (dirtycnt == 0)) {
2996 						off = saveoff;
2997 						wapbl_circ_advance(wr,
2998 						    logblklen, &off);
2999 						for (j = 0; j < n; j++) {
3000 							struct wapbl_blk *wb =
3001 							   wapbl_blkhash_get(wr,
3002 							   wapbl_block_daddr(wc, i, j, fsblklen));
3003 							if (wb &&
3004 							  (wb->wb_off == off)) {
3005 								wapbl_blkhash_rem(wr, wb->wb_blk);
3006 							}
3007 							wapbl_circ_advance(wr,
3008 							    fsblklen, &off);
3009 						}
3010 					}
3011 #endif
3012 				}
3013 			}
3014 			break;
3015 		case WAPBL_WC_REVOCATIONS:
3016 		case WAPBL_WC_INODES:
3017 			break;
3018 		default:
3019 			KASSERT(0);
3020 		}
3021 #ifdef DEBUG
3022 		wapbl_circ_advance(wr, wcn->wc_len, &saveoff);
3023 		KASSERT(off == saveoff);
3024 #endif
3025 	}
3026  out:
3027 	wapbl_free(scratch1, MAXBSIZE);
3028 	wapbl_free(scratch2, MAXBSIZE);
3029 	if (!error && mismatchcnt)
3030 		error = EFTYPE;
3031 	return error;
3032 }
3033 #endif
3034 
3035 int
3036 wapbl_replay_write(struct wapbl_replay *wr, struct vnode *fsdevvp)
3037 {
3038 	struct wapbl_blk *wb;
3039 	size_t i;
3040 	off_t off;
3041 	void *scratch;
3042 	int error = 0;
3043 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3044 
3045 	KDASSERT(wapbl_replay_isopen(wr));
3046 
3047 	scratch = wapbl_alloc(MAXBSIZE);
3048 
3049 	for (i = 0; i <= wr->wr_blkhashmask; ++i) {
3050 		LIST_FOREACH(wb, &wr->wr_blkhash[i], wb_hash) {
3051 			off = wb->wb_off;
3052 			error = wapbl_circ_read(wr, scratch, fsblklen, &off);
3053 			if (error)
3054 				break;
3055 			error = wapbl_write(scratch, fsblklen, fsdevvp,
3056 			    wb->wb_blk);
3057 			if (error)
3058 				break;
3059 		}
3060 	}
3061 
3062 	wapbl_free(scratch, MAXBSIZE);
3063 	return error;
3064 }
3065 
3066 int
3067 wapbl_replay_can_read(struct wapbl_replay *wr, daddr_t blk, long len)
3068 {
3069 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3070 
3071 	KDASSERT(wapbl_replay_isopen(wr));
3072 	KASSERT((len % fsblklen) == 0);
3073 
3074 	while (len != 0) {
3075 		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
3076 		if (wb)
3077 			return 1;
3078 		len -= fsblklen;
3079 	}
3080 	return 0;
3081 }
3082 
3083 int
3084 wapbl_replay_read(struct wapbl_replay *wr, void *data, daddr_t blk, long len)
3085 {
3086 	int fsblklen = 1 << wr->wr_fs_dev_bshift;
3087 
3088 	KDASSERT(wapbl_replay_isopen(wr));
3089 
3090 	KASSERT((len % fsblklen) == 0);
3091 
3092 	while (len != 0) {
3093 		struct wapbl_blk *wb = wapbl_blkhash_get(wr, blk);
3094 		if (wb) {
3095 			off_t off = wb->wb_off;
3096 			int error;
3097 			error = wapbl_circ_read(wr, data, fsblklen, &off);
3098 			if (error)
3099 				return error;
3100 		}
3101 		data = (uint8_t *)data + fsblklen;
3102 		len -= fsblklen;
3103 		blk++;
3104 	}
3105 	return 0;
3106 }
3107 
3108 #ifdef _KERNEL
3109 
3110 MODULE(MODULE_CLASS_VFS, wapbl, NULL);
3111 
3112 static int
3113 wapbl_modcmd(modcmd_t cmd, void *arg)
3114 {
3115 
3116 	switch (cmd) {
3117 	case MODULE_CMD_INIT:
3118 		wapbl_init();
3119 		return 0;
3120 	case MODULE_CMD_FINI:
3121 		return wapbl_fini();
3122 	default:
3123 		return ENOTTY;
3124 	}
3125 }
3126 #endif /* _KERNEL */
3127