xref: /dflybsd-src/sys/kern/vfs_jops.c (revision b5b0912b1891e95ccc48cad83f09239ccb7ffc16)
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_jops.c,v 1.13 2005/06/03 23:57:32 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread.
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger
60  * spooling areas then the memory buffer is able to provide by e.g.
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/journal.h>
88 #include <sys/file.h>
89 #include <sys/proc.h>
90 #include <sys/msfbuf.h>
91 
92 #include <machine/limits.h>
93 
94 #include <vm/vm.h>
95 #include <vm/vm_object.h>
96 #include <vm/vm_page.h>
97 #include <vm/vm_pager.h>
98 #include <vm/vnode_pager.h>
99 
100 #include <sys/file2.h>
101 #include <sys/thread2.h>
102 
103 static int journal_attach(struct mount *mp);
104 static void journal_detach(struct mount *mp);
105 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
106 			    const struct mountctl_install_journal *info);
107 static int journal_remove_vfs_journal(struct mount *mp,
108 			    const struct mountctl_remove_journal *info);
109 static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
110 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
111 static int journal_status_vfs_journal(struct mount *mp,
112 		       const struct mountctl_status_journal *info,
113 		       struct mountctl_journal_ret_status *rstat,
114 		       int buflen, int *res);
115 static void journal_wthread(void *info);
116 static void journal_rthread(void *info);
117 
118 static void *journal_reserve(struct journal *jo,
119 			    struct journal_rawrecbeg **rawpp,
120 			    int16_t streamid, int bytes);
121 static void *journal_extend(struct journal *jo,
122 			    struct journal_rawrecbeg **rawpp,
123 			    int truncbytes, int bytes, int *newstreamrecp);
124 static void journal_abort(struct journal *jo,
125 			    struct journal_rawrecbeg **rawpp);
126 static void journal_commit(struct journal *jo,
127 			    struct journal_rawrecbeg **rawpp,
128 			    int bytes, int closeout);
129 
130 static void jrecord_init(struct journal *jo,
131 			    struct jrecord *jrec, int16_t streamid);
132 static struct journal_subrecord *jrecord_push(
133 			    struct jrecord *jrec, int16_t rectype);
134 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
135 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
136 			    int16_t rectype, int bytes);
137 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
138 static void jrecord_done(struct jrecord *jrec, int abortit);
139 
140 static int journal_setattr(struct vop_setattr_args *ap);
141 static int journal_write(struct vop_write_args *ap);
142 static int journal_fsync(struct vop_fsync_args *ap);
143 static int journal_putpages(struct vop_putpages_args *ap);
144 static int journal_setacl(struct vop_setacl_args *ap);
145 static int journal_setextattr(struct vop_setextattr_args *ap);
146 static int journal_ncreate(struct vop_ncreate_args *ap);
147 static int journal_nmknod(struct vop_nmknod_args *ap);
148 static int journal_nlink(struct vop_nlink_args *ap);
149 static int journal_nsymlink(struct vop_nsymlink_args *ap);
150 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
151 static int journal_nremove(struct vop_nremove_args *ap);
152 static int journal_nmkdir(struct vop_nmkdir_args *ap);
153 static int journal_nrmdir(struct vop_nrmdir_args *ap);
154 static int journal_nrename(struct vop_nrename_args *ap);
155 
156 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
157     { &vop_default_desc,		vop_journal_operate_ap },
158     { &vop_mountctl_desc,		(void *)journal_mountctl },
159     { &vop_setattr_desc,		(void *)journal_setattr },
160     { &vop_write_desc,			(void *)journal_write },
161     { &vop_fsync_desc,			(void *)journal_fsync },
162     { &vop_putpages_desc,		(void *)journal_putpages },
163     { &vop_setacl_desc,			(void *)journal_setacl },
164     { &vop_setextattr_desc,		(void *)journal_setextattr },
165     { &vop_ncreate_desc,		(void *)journal_ncreate },
166     { &vop_nmknod_desc,			(void *)journal_nmknod },
167     { &vop_nlink_desc,			(void *)journal_nlink },
168     { &vop_nsymlink_desc,		(void *)journal_nsymlink },
169     { &vop_nwhiteout_desc,		(void *)journal_nwhiteout },
170     { &vop_nremove_desc,		(void *)journal_nremove },
171     { &vop_nmkdir_desc,			(void *)journal_nmkdir },
172     { &vop_nrmdir_desc,			(void *)journal_nrmdir },
173     { &vop_nrename_desc,		(void *)journal_nrename },
174     { NULL, NULL }
175 };
176 
177 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
178 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
179 
180 int
181 journal_mountctl(struct vop_mountctl_args *ap)
182 {
183     struct mount *mp;
184     int error = 0;
185 
186     mp = ap->a_head.a_ops->vv_mount;
187     KKASSERT(mp);
188 
189     if (mp->mnt_vn_journal_ops == NULL) {
190 	switch(ap->a_op) {
191 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
192 	    error = journal_attach(mp);
193 	    if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
194 		error = EINVAL;
195 	    if (error == 0 && ap->a_fp == NULL)
196 		error = EBADF;
197 	    if (error == 0)
198 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
199 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
200 		journal_detach(mp);
201 	    break;
202 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
203 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
204 	case MOUNTCTL_STATUS_VFS_JOURNAL:
205 	    error = ENOENT;
206 	    break;
207 	default:
208 	    error = EOPNOTSUPP;
209 	    break;
210 	}
211     } else {
212 	switch(ap->a_op) {
213 	case MOUNTCTL_INSTALL_VFS_JOURNAL:
214 	    if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
215 		error = EINVAL;
216 	    if (error == 0 && ap->a_fp == NULL)
217 		error = EBADF;
218 	    if (error == 0)
219 		error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
220 	    break;
221 	case MOUNTCTL_REMOVE_VFS_JOURNAL:
222 	    if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
223 		error = EINVAL;
224 	    if (error == 0)
225 		error = journal_remove_vfs_journal(mp, ap->a_ctl);
226 	    if (TAILQ_EMPTY(&mp->mnt_jlist))
227 		journal_detach(mp);
228 	    break;
229 	case MOUNTCTL_RESYNC_VFS_JOURNAL:
230 	    if (ap->a_ctllen != 0)
231 		error = EINVAL;
232 	    error = journal_resync_vfs_journal(mp, ap->a_ctl);
233 	    break;
234 	case MOUNTCTL_STATUS_VFS_JOURNAL:
235 	    if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
236 		error = EINVAL;
237 	    if (error == 0) {
238 		error = journal_status_vfs_journal(mp, ap->a_ctl,
239 					ap->a_buf, ap->a_buflen, ap->a_res);
240 	    }
241 	    break;
242 	default:
243 	    error = EOPNOTSUPP;
244 	    break;
245 	}
246     }
247     return (error);
248 }
249 
250 /*
251  * High level mount point setup.  When a
252  */
253 static int
254 journal_attach(struct mount *mp)
255 {
256     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, journal_vnodeop_entries);
257     return(0);
258 }
259 
260 static void
261 journal_detach(struct mount *mp)
262 {
263     if (mp->mnt_vn_journal_ops)
264 	vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
265 }
266 
267 /*
268  * Install a journal on a mount point.  Each journal has an associated worker
269  * thread which is responsible for buffering and spooling the data to the
270  * target.  A mount point may have multiple journals attached to it.  An
271  * initial start record is generated when the journal is associated.
272  */
273 static int
274 journal_install_vfs_journal(struct mount *mp, struct file *fp,
275 			    const struct mountctl_install_journal *info)
276 {
277     struct journal *jo;
278     struct jrecord jrec;
279     int error = 0;
280     int size;
281 
282     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
283     bcopy(info->id, jo->id, sizeof(jo->id));
284     jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
285 				MC_JOURNAL_STOP_REQ);
286 
287     /*
288      * Memory FIFO size, round to nearest power of 2
289      */
290     if (info->membufsize) {
291 	if (info->membufsize < 65536)
292 	    size = 65536;
293 	else if (info->membufsize > 128 * 1024 * 1024)
294 	    size = 128 * 1024 * 1024;
295 	else
296 	    size = (int)info->membufsize;
297     } else {
298 	size = 1024 * 1024;
299     }
300     jo->fifo.size = 1;
301     while (jo->fifo.size < size)
302 	jo->fifo.size <<= 1;
303 
304     /*
305      * Other parameters.  If not specified the starting transaction id
306      * will be the current date.
307      */
308     if (info->transid) {
309 	jo->transid = info->transid;
310     } else {
311 	struct timespec ts;
312 	getnanotime(&ts);
313 	jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
314     }
315 
316     jo->fp = fp;
317 
318     /*
319      * Allocate the memory FIFO
320      */
321     jo->fifo.mask = jo->fifo.size - 1;
322     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
323     if (jo->fifo.membase == NULL)
324 	error = ENOMEM;
325 
326     /*
327      * Create the worker thread and generate the association record.
328      */
329     if (error) {
330 	free(jo, M_JOURNAL);
331     } else {
332 	fhold(fp);
333 	jo->flags |= MC_JOURNAL_WACTIVE;
334 	lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
335 			TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
336 	lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
337 	lwkt_schedule(&jo->wthread);
338 
339 	if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
340 	    jo->flags |= MC_JOURNAL_RACTIVE;
341 	    lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
342 			TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
343 	    lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
344 	    lwkt_schedule(&jo->rthread);
345 	}
346 	jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
347 	jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
348 	jrecord_done(&jrec, 0);
349 	TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
350     }
351     return(error);
352 }
353 
354 /*
355  * Disassociate a journal from a mount point and terminate its worker thread.
356  * A final termination record is written out before the file pointer is
357  * dropped.
358  */
359 static int
360 journal_remove_vfs_journal(struct mount *mp,
361 			   const struct mountctl_remove_journal *info)
362 {
363     struct journal *jo;
364     int error;
365 
366     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
367 	if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
368 	    break;
369     }
370     if (jo)
371 	error = journal_destroy(mp, jo, info->flags);
372     else
373 	error = EINVAL;
374     return (error);
375 }
376 
377 /*
378  * Remove all journals associated with a mount point.  Usually called
379  * by the umount code.
380  */
381 void
382 journal_remove_all_journals(struct mount *mp, int flags)
383 {
384     struct journal *jo;
385 
386     while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
387 	journal_destroy(mp, jo, flags);
388     }
389 }
390 
391 static int
392 journal_destroy(struct mount *mp, struct journal *jo, int flags)
393 {
394     struct jrecord jrec;
395 
396     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
397 
398     jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
399     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
400     jrecord_done(&jrec, 0);
401 
402     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
403     wakeup(&jo->fifo);
404     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
405 	tsleep(jo, 0, "jwait", 0);
406     }
407     lwkt_free_thread(&jo->wthread); /* XXX SMP */
408     if (jo->fp)
409 	fdrop(jo->fp, curthread);
410     if (jo->fifo.membase)
411 	free(jo->fifo.membase, M_JFIFO);
412     free(jo, M_JOURNAL);
413     return(0);
414 }
415 
416 static int
417 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
418 {
419     return(EINVAL);
420 }
421 
422 static int
423 journal_status_vfs_journal(struct mount *mp,
424 		       const struct mountctl_status_journal *info,
425 		       struct mountctl_journal_ret_status *rstat,
426 		       int buflen, int *res)
427 {
428     struct journal *jo;
429     int error = 0;
430     int index;
431 
432     index = 0;
433     *res = 0;
434     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
435 	if (info->index == MC_JOURNAL_INDEX_ID) {
436 	    if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
437 		continue;
438 	} else if (info->index >= 0) {
439 	    if (info->index < index)
440 		continue;
441 	} else if (info->index != MC_JOURNAL_INDEX_ALL) {
442 	    continue;
443 	}
444 	if (buflen < sizeof(*rstat)) {
445 	    if (*res)
446 		rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
447 	    else
448 		error = EINVAL;
449 	    break;
450 	}
451 	bzero(rstat, sizeof(*rstat));
452 	rstat->recsize = sizeof(*rstat);
453 	bcopy(jo->id, rstat->id, sizeof(jo->id));
454 	rstat->index = index;
455 	rstat->membufsize = jo->fifo.size;
456 	rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
457 	rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
458 	rstat->bytessent = jo->total_acked;
459 	++rstat;
460 	++index;
461 	*res += sizeof(*rstat);
462 	buflen -= sizeof(*rstat);
463     }
464     return(error);
465 }
466 
467 /*
468  * The per-journal worker thread is responsible for writing out the
469  * journal's FIFO to the target stream.
470  */
471 static void
472 journal_wthread(void *info)
473 {
474     struct journal *jo = info;
475     struct journal_rawrecbeg *rawp;
476     int bytes;
477     int error;
478     int avail;
479     int res;
480 
481     for (;;) {
482 	/*
483 	 * Calculate the number of bytes available to write.  This buffer
484 	 * area may contain reserved records so we can't just write it out
485 	 * without further checks.
486 	 */
487 	bytes = jo->fifo.windex - jo->fifo.rindex;
488 
489 	/*
490 	 * sleep if no bytes are available or if an incomplete record is
491 	 * encountered (it needs to be filled in before we can write it
492 	 * out), and skip any pad records that we encounter.
493 	 */
494 	if (bytes == 0) {
495 	    if (jo->flags & MC_JOURNAL_STOP_REQ)
496 		break;
497 	    tsleep(&jo->fifo, 0, "jfifo", hz);
498 	    continue;
499 	}
500 
501 	/*
502 	 * Sleep if we can not go any further due to hitting an incomplete
503 	 * record.  This case should occur rarely but may have to be better
504 	 * optimized XXX.
505 	 */
506 	rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
507 	if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
508 	    tsleep(&jo->fifo, 0, "jpad", hz);
509 	    continue;
510 	}
511 
512 	/*
513 	 * Skip any pad records.  We do not write out pad records if we can
514 	 * help it.
515 	 *
516 	 * If xindex is caught up to rindex it gets incremented along with
517 	 * rindex.  XXX SMP
518 	 */
519 	if (rawp->streamid == JREC_STREAMID_PAD) {
520 	    if (jo->fifo.rindex == jo->fifo.xindex)
521 		jo->fifo.xindex += (rawp->recsize + 15) & ~15;
522 	    jo->fifo.rindex += (rawp->recsize + 15) & ~15;
523 	    jo->total_acked += bytes;
524 	    KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
525 	    continue;
526 	}
527 
528 	/*
529 	 * 'bytes' is the amount of data that can potentially be written out.
530 	 * Calculate 'res', the amount of data that can actually be written
531 	 * out.  res is bounded either by hitting the end of the physical
532 	 * memory buffer or by hitting an incomplete record.  Incomplete
533 	 * records often occur due to the way the space reservation model
534 	 * works.
535 	 */
536 	res = 0;
537 	avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
538 	while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
539 	    res += (rawp->recsize + 15) & ~15;
540 	    if (res >= avail) {
541 		KKASSERT(res == avail);
542 		break;
543 	    }
544 	    rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
545 	}
546 
547 	/*
548 	 * Issue the write and deal with any errors or other conditions.
549 	 * For now assume blocking I/O.  Since we are record-aware the
550 	 * code cannot yet handle partial writes.
551 	 *
552 	 * XXX EWOULDBLOCK/NBIO
553 	 * XXX notification on failure
554 	 * XXX permanent verses temporary failures
555 	 * XXX two-way acknowledgement stream in the return direction / xindex
556 	 */
557 	bytes = res;
558 	error = fp_write(jo->fp,
559 			jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
560 			bytes, &res);
561 	if (error) {
562 	    printf("journal_thread(%s) write, error %d\n", jo->id, error);
563 	    /* XXX */
564 	} else {
565 	    KKASSERT(res == bytes);
566 	}
567 
568 	/*
569 	 * Advance rindex.  If the journal stream is not full duplex we also
570 	 * advance xindex, otherwise the rjournal thread is responsible for
571 	 * advancing xindex.
572 	 */
573 	jo->fifo.rindex += bytes;
574 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
575 	    jo->fifo.xindex += bytes;
576 	jo->total_acked += bytes;
577 	KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
578 	if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
579 	    if (jo->flags & MC_JOURNAL_WWAIT) {
580 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
581 		wakeup(&jo->fifo.windex);
582 	    }
583 	}
584     }
585     jo->flags &= ~MC_JOURNAL_WACTIVE;
586     wakeup(jo);
587     wakeup(&jo->fifo.windex);
588 }
589 
590 /*
591  * A second per-journal worker thread is created for two-way journaling
592  * streams to deal with the return acknowledgement stream.
593  */
594 static void
595 journal_rthread(void *info)
596 {
597     struct journal_rawrecbeg *rawp;
598     struct journal_ackrecord ack;
599     struct journal *jo = info;
600     int64_t transid;
601     int error;
602     int count;
603     int bytes;
604     int index;
605 
606     transid = 0;
607     error = 0;
608 
609     for (;;) {
610 	/*
611 	 * We have been asked to stop
612 	 */
613 	if (jo->flags & MC_JOURNAL_STOP_REQ)
614 		break;
615 
616 	/*
617 	 * If we have no active transaction id, get one from the return
618 	 * stream.
619 	 */
620 	if (transid == 0) {
621 	    for (index = 0; index < sizeof(ack); index += count) {
622 		error = fp_read(jo->fp, &ack, sizeof(ack), &count);
623 		if (error)
624 		    break;
625 		if (count == 0)
626 		    tsleep(&jo->fifo.xindex, 0, "jread", hz);
627 	    }
628 	    if (error) {
629 		printf("read error %d on receive stream\n", error);
630 		break;
631 	    }
632 	    if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
633 		ack.rend.endmagic != JREC_ENDMAGIC
634 	    ) {
635 		printf("bad begmagic or endmagic on receive stream\n");
636 		break;
637 	    }
638 	    transid = ack.rbeg.transid;
639 	}
640 
641 	/*
642 	 * Calculate the number of unacknowledged bytes.  If there are no
643 	 * unacknowledged bytes then unsent data was acknowledged, report,
644 	 * sleep a bit, and loop in that case.  This should not happen
645 	 * normally.  The ack record is thrown away.
646 	 */
647 	bytes = jo->fifo.rindex - jo->fifo.xindex;
648 
649 	if (bytes == 0) {
650 	    printf("warning: unsent data acknowledged\n");
651 	    tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
652 	    transid = 0;
653 	    continue;
654 	}
655 
656 	/*
657 	 * Since rindex has advanceted, the record pointed to by xindex
658 	 * must be a valid record.
659 	 */
660 	rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
661 	KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
662 	KKASSERT(rawp->recsize <= bytes);
663 
664 	/*
665 	 * The target can acknowledge several records at once.
666 	 */
667 	if (rawp->transid < transid) {
668 	    printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
669 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
670 	    if (jo->flags & MC_JOURNAL_WWAIT) {
671 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
672 		wakeup(&jo->fifo.windex);
673 	    }
674 	    continue;
675 	}
676 	if (rawp->transid == transid) {
677 	    printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
678 	    jo->fifo.xindex += (rawp->recsize + 15) & ~15;
679 	    if (jo->flags & MC_JOURNAL_WWAIT) {
680 		jo->flags &= ~MC_JOURNAL_WWAIT;	/* XXX hysteresis */
681 		wakeup(&jo->fifo.windex);
682 	    }
683 	    transid = 0;
684 	    continue;
685 	}
686 	printf("warning: unsent data(2) acknowledged\n");
687 	transid = 0;
688     }
689     jo->flags &= ~MC_JOURNAL_RACTIVE;
690     wakeup(jo);
691     wakeup(&jo->fifo.windex);
692 }
693 
694 /*
695  * This builds a pad record which the journaling thread will skip over.  Pad
696  * records are required when we are unable to reserve sufficient stream space
697  * due to insufficient space at the end of the physical memory fifo.
698  *
699  * Even though the record is not transmitted, a normal transid must be
700  * assigned to it so link recovery operations after a failure work properly.
701  */
702 static
703 void
704 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
705 {
706     struct journal_rawrecend *rendp;
707 
708     KKASSERT((recsize & 15) == 0 && recsize >= 16);
709 
710     rawp->streamid = JREC_STREAMID_PAD;
711     rawp->recsize = recsize;	/* must be 16-byte aligned */
712     rawp->transid = transid;
713     /*
714      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
715      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
716      * hopefully cause the compiler to not make any assumptions.
717      */
718     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
719     rendp->endmagic = JREC_ENDMAGIC;
720     rendp->check = 0;
721     rendp->recsize = rawp->recsize;
722 
723     /*
724      * Set the begin magic last.  This is what will allow the journal
725      * thread to write the record out.  Use a store fence to prevent
726      * compiler and cpu reordering of the writes.
727      */
728     cpu_sfence();
729     rawp->begmagic = JREC_BEGMAGIC;
730 }
731 
732 /*
733  * Wake up the worker thread if the FIFO is more then half full or if
734  * someone is waiting for space to be freed up.  Otherwise let the
735  * heartbeat deal with it.  Being able to avoid waking up the worker
736  * is the key to the journal's cpu performance.
737  */
738 static __inline
739 void
740 journal_commit_wakeup(struct journal *jo)
741 {
742     int avail;
743 
744     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
745     KKASSERT(avail >= 0);
746     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
747 	wakeup(&jo->fifo);
748 }
749 
750 /*
751  * Create a new BEGIN stream record with the specified streamid and the
752  * specified amount of payload space.  *rawpp will be set to point to the
753  * base of the new stream record and a pointer to the base of the payload
754  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
755  * making this call.  The raw record header will be partially initialized.
756  *
757  * A stream can be extended, aborted, or committed by other API calls
758  * below.  This may result in a sequence of potentially disconnected
759  * stream records to be output to the journaling target.  The first record
760  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
761  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
762  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
763  * up being the same as the first, in which case the bits are all set in
764  * the first record.
765  *
766  * The stream record is created in an incomplete state by setting the begin
767  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
768  * flushing the fifo past our record until we have finished populating it.
769  * Other threads can reserve and operate on their own space without stalling
770  * but the stream output will stall until we have completed operations.  The
771  * memory FIFO is intended to be large enough to absorb such situations
772  * without stalling out other threads.
773  */
774 static
775 void *
776 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
777 		int16_t streamid, int bytes)
778 {
779     struct journal_rawrecbeg *rawp;
780     int avail;
781     int availtoend;
782     int req;
783 
784     /*
785      * Add header and trailer overheads to the passed payload.  Note that
786      * the passed payload size need not be aligned in any way.
787      */
788     bytes += sizeof(struct journal_rawrecbeg);
789     bytes += sizeof(struct journal_rawrecend);
790 
791     for (;;) {
792 	/*
793 	 * First, check boundary conditions.  If the request would wrap around
794 	 * we have to skip past the ending block and return to the beginning
795 	 * of the FIFO's buffer.  Calculate 'req' which is the actual number
796 	 * of bytes being reserved, including wrap-around dead space.
797 	 *
798 	 * Neither 'bytes' or 'req' are aligned.
799 	 *
800 	 * Note that availtoend is not truncated to avail and so cannot be
801 	 * used to determine whether the reservation is possible by itself.
802 	 * Also, since all fifo ops are 16-byte aligned, we can check
803 	 * the size before calculating the aligned size.
804 	 */
805 	availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
806 	KKASSERT((availtoend & 15) == 0);
807 	if (bytes > availtoend)
808 	    req = bytes + availtoend;	/* add pad to end */
809 	else
810 	    req = bytes;
811 
812 	/*
813 	 * Next calculate the total available space and see if it is
814 	 * sufficient.  We cannot overwrite previously buffered data
815 	 * past xindex because otherwise we would not be able to restart
816 	 * a broken link at the target's last point of commit.
817 	 */
818 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
819 	KKASSERT(avail >= 0 && (avail & 15) == 0);
820 
821 	if (avail < req) {
822 	    /* XXX MC_JOURNAL_STOP_IMM */
823 	    jo->flags |= MC_JOURNAL_WWAIT;
824 	    tsleep(&jo->fifo.windex, 0, "jwrite", 0);
825 	    continue;
826 	}
827 
828 	/*
829 	 * Create a pad record for any dead space and create an incomplete
830 	 * record for the live space, then return a pointer to the
831 	 * contiguous buffer space that was requested.
832 	 *
833 	 * NOTE: The worker thread will not flush past an incomplete
834 	 * record, so the reserved space can be filled in at-will.  The
835 	 * journaling code must also be aware the reserved sections occuring
836 	 * after this one will also not be written out even if completed
837 	 * until this one is completed.
838 	 *
839 	 * The transaction id must accomodate real and potential pad creation.
840 	 */
841 	rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
842 	if (req != bytes) {
843 	    journal_build_pad(rawp, availtoend, jo->transid);
844 	    ++jo->transid;
845 	    rawp = (void *)jo->fifo.membase;
846 	}
847 	rawp->begmagic = JREC_INCOMPLETEMAGIC;	/* updated by abort/commit */
848 	rawp->recsize = bytes;			/* (unaligned size) */
849 	rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
850 	rawp->transid = jo->transid;
851 	jo->transid += 2;
852 
853 	/*
854 	 * Issue a memory barrier to guarentee that the record data has been
855 	 * properly initialized before we advance the write index and return
856 	 * a pointer to the reserved record.  Otherwise the worker thread
857 	 * could accidently run past us.
858 	 *
859 	 * Note that stream records are always 16-byte aligned.
860 	 */
861 	cpu_sfence();
862 	jo->fifo.windex += (req + 15) & ~15;
863 	*rawpp = rawp;
864 	return(rawp + 1);
865     }
866     /* not reached */
867     *rawpp = NULL;
868     return(NULL);
869 }
870 
871 /*
872  * Attempt to extend the stream record by <bytes> worth of payload space.
873  *
874  * If it is possible to extend the existing stream record no truncation
875  * occurs and the record is extended as specified.  A pointer to the
876  * truncation offset within the payload space is returned.
877  *
878  * If it is not possible to do this the existing stream record is truncated
879  * and committed, and a new stream record of size <bytes> is created.  A
880  * pointer to the base of the new stream record's payload space is returned.
881  *
882  * *rawpp is set to the new reservation in the case of a new record but
883  * the caller cannot depend on a comparison with the old rawp to determine if
884  * this case occurs because we could end up using the same memory FIFO
885  * offset for the new stream record.  Use *newstreamrecp instead.
886  */
887 static void *
888 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp,
889 		int truncbytes, int bytes, int *newstreamrecp)
890 {
891     struct journal_rawrecbeg *rawp;
892     int16_t streamid;
893     int availtoend;
894     int avail;
895     int osize;
896     int nsize;
897     int wbase;
898     void *rptr;
899 
900     *newstreamrecp = 0;
901     rawp = *rawpp;
902     osize = (rawp->recsize + 15) & ~15;
903     nsize = (rawp->recsize + bytes + 15) & ~15;
904     wbase = (char *)rawp - jo->fifo.membase;
905 
906     /*
907      * If the aligned record size does not change we can trivially adjust
908      * the record size.
909      */
910     if (nsize == osize) {
911 	rawp->recsize += bytes;
912 	return((char *)(rawp + 1) + truncbytes);
913     }
914 
915     /*
916      * If the fifo's write index hasn't been modified since we made the
917      * reservation and we do not hit any boundary conditions, we can
918      * trivially make the record smaller or larger.
919      */
920     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
921 	availtoend = jo->fifo.size - wbase;
922 	avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
923 	KKASSERT((availtoend & 15) == 0);
924 	KKASSERT((avail & 15) == 0);
925 	if (nsize <= avail && nsize <= availtoend) {
926 	    jo->fifo.windex += nsize - osize;
927 	    rawp->recsize += bytes;
928 	    return((char *)(rawp + 1) + truncbytes);
929 	}
930     }
931 
932     /*
933      * It was not possible to extend the buffer.  Commit the current
934      * buffer and create a new one.  We manually clear the BEGIN mark that
935      * journal_reserve() creates (because this is a continuing record, not
936      * the start of a new stream).
937      */
938     streamid = rawp->streamid & JREC_STREAMID_MASK;
939     journal_commit(jo, rawpp, truncbytes, 0);
940     rptr = journal_reserve(jo, rawpp, streamid, bytes);
941     rawp = *rawpp;
942     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
943     *newstreamrecp = 1;
944     return(rptr);
945 }
946 
947 /*
948  * Abort a journal record.  If the transaction record represents a stream
949  * BEGIN and we can reverse the fifo's write index we can simply reverse
950  * index the entire record, as if it were never reserved in the first place.
951  *
952  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
953  * with the payload truncated to 0 bytes.
954  */
955 static void
956 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
957 {
958     struct journal_rawrecbeg *rawp;
959     int osize;
960 
961     rawp = *rawpp;
962     osize = (rawp->recsize + 15) & ~15;
963 
964     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
965 	(jo->fifo.windex & jo->fifo.mask) ==
966 	 (char *)rawp - jo->fifo.membase + osize)
967     {
968 	jo->fifo.windex -= osize;
969 	*rawpp = NULL;
970     } else {
971 	rawp->streamid |= JREC_STREAMCTL_ABORTED;
972 	journal_commit(jo, rawpp, 0, 1);
973     }
974 }
975 
976 /*
977  * Commit a journal record and potentially truncate it to the specified
978  * number of payload bytes.  If you do not want to truncate the record,
979  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
980  * field includes header and trailer and will not be correct.  Note that
981  * passing 0 will truncate the entire data payload of the record.
982  *
983  * The logical stream is terminated by this function.
984  *
985  * If truncation occurs, and it is not possible to physically optimize the
986  * memory FIFO due to other threads having reserved space after ours,
987  * the remaining reserved space will be covered by a pad record.
988  */
989 static void
990 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
991 		int bytes, int closeout)
992 {
993     struct journal_rawrecbeg *rawp;
994     struct journal_rawrecend *rendp;
995     int osize;
996     int nsize;
997 
998     rawp = *rawpp;
999     *rawpp = NULL;
1000 
1001     KKASSERT((char *)rawp >= jo->fifo.membase &&
1002 	     (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
1003     KKASSERT(((intptr_t)rawp & 15) == 0);
1004 
1005     /*
1006      * Truncate the record if necessary.  If the FIFO write index as still
1007      * at the end of our record we can optimally backindex it.  Otherwise
1008      * we have to insert a pad record to cover the dead space.
1009      *
1010      * We calculate osize which is the 16-byte-aligned original recsize.
1011      * We calculate nsize which is the 16-byte-aligned new recsize.
1012      *
1013      * Due to alignment issues or in case the passed truncation bytes is
1014      * the same as the original payload, nsize may be equal to osize even
1015      * if the committed bytes is less then the originally reserved bytes.
1016      */
1017     if (bytes >= 0) {
1018 	KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
1019 	osize = (rawp->recsize + 15) & ~15;
1020 	rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
1021 			sizeof(struct journal_rawrecend);
1022 	nsize = (rawp->recsize + 15) & ~15;
1023 	KKASSERT(nsize <= osize);
1024 	if (osize == nsize) {
1025 	    /* do nothing */
1026 	} else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
1027 	    /* we are able to backindex the fifo */
1028 	    jo->fifo.windex -= osize - nsize;
1029 	} else {
1030 	    /* we cannot backindex the fifo, emplace a pad in the dead space */
1031 	    journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
1032 				rawp->transid + 1);
1033 	}
1034     }
1035 
1036     /*
1037      * Fill in the trailer.  Note that unlike pad records, the trailer will
1038      * never overlap the header.
1039      */
1040     rendp = (void *)((char *)rawp +
1041 	    ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
1042     rendp->endmagic = JREC_ENDMAGIC;
1043     rendp->recsize = rawp->recsize;
1044     rendp->check = 0;		/* XXX check word, disabled for now */
1045 
1046     /*
1047      * Fill in begmagic last.  This will allow the worker thread to proceed.
1048      * Use a memory barrier to guarentee write ordering.  Mark the stream
1049      * as terminated if closeout is set.  This is the typical case.
1050      */
1051     if (closeout)
1052 	rawp->streamid |= JREC_STREAMCTL_END;
1053     cpu_sfence();		/* memory and compiler barrier */
1054     rawp->begmagic = JREC_BEGMAGIC;
1055 
1056     journal_commit_wakeup(jo);
1057 }
1058 
1059 /************************************************************************
1060  *			TRANSACTION SUPPORT ROUTINES			*
1061  ************************************************************************
1062  *
1063  * JRECORD_*() - routines to create subrecord transactions and embed them
1064  *		 in the logical streams managed by the journal_*() routines.
1065  */
1066 
1067 static int16_t sid = JREC_STREAMID_JMIN;
1068 
1069 /*
1070  * Initialize the passed jrecord structure and start a new stream transaction
1071  * by reserving an initial build space in the journal's memory FIFO.
1072  */
1073 static void
1074 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
1075 {
1076     bzero(jrec, sizeof(*jrec));
1077     jrec->jo = jo;
1078     if (streamid < 0) {
1079 	streamid = sid++;	/* XXX need to track stream ids! */
1080 	if (sid == JREC_STREAMID_JMAX)
1081 	    sid = JREC_STREAMID_JMIN;
1082     }
1083     jrec->streamid = streamid;
1084     jrec->stream_residual = JREC_DEFAULTSIZE;
1085     jrec->stream_reserved = jrec->stream_residual;
1086     jrec->stream_ptr =
1087 	journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
1088 }
1089 
1090 /*
1091  * Push a recursive record type.  All pushes should have matching pops.
1092  * The old parent is returned and the newly pushed record becomes the
1093  * new parent.  Note that the old parent's pointer may already be invalid
1094  * or may become invalid if jrecord_write() had to build a new stream
1095  * record, so the caller should not mess with the returned pointer in
1096  * any way other then to save it.
1097  */
1098 static
1099 struct journal_subrecord *
1100 jrecord_push(struct jrecord *jrec, int16_t rectype)
1101 {
1102     struct journal_subrecord *save;
1103 
1104     save = jrec->parent;
1105     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
1106     jrec->last = NULL;
1107     KKASSERT(jrec->parent != NULL);
1108     ++jrec->pushcount;
1109     ++jrec->pushptrgood;	/* cleared on flush */
1110     return(save);
1111 }
1112 
1113 /*
1114  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
1115  * on the last record written within the subtransaction.  If the last
1116  * record written is not accessible or if the subtransaction is empty,
1117  * we must write out a pad record with JMASK_LAST set before popping.
1118  *
1119  * When popping a subtransaction the parent record's recsize field
1120  * will be properly set.  If the parent pointer is no longer valid
1121  * (which can occur if the data has already been flushed out to the
1122  * stream), the protocol spec allows us to leave it 0.
1123  *
1124  * The saved parent pointer which we restore may or may not be valid,
1125  * and if not valid may or may not be NULL, depending on the value
1126  * of pushptrgood.
1127  */
1128 static void
1129 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
1130 {
1131     struct journal_subrecord *last;
1132 
1133     KKASSERT(jrec->pushcount > 0);
1134     KKASSERT(jrec->residual == 0);
1135 
1136     /*
1137      * Set JMASK_LAST on the last record we wrote at the current
1138      * level.  If last is NULL we either no longer have access to the
1139      * record or the subtransaction was empty and we must write out a pad
1140      * record.
1141      */
1142     if ((last = jrec->last) == NULL) {
1143 	jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
1144 	last = jrec->last;	/* reload after possible flush */
1145     } else {
1146 	last->rectype |= JMASK_LAST;
1147     }
1148 
1149     /*
1150      * pushptrgood tells us how many levels of parent record pointers
1151      * are valid.  The jrec only stores the current parent record pointer
1152      * (and it is only valid if pushptrgood != 0).  The higher level parent
1153      * record pointers are saved by the routines calling jrecord_push() and
1154      * jrecord_pop().  These pointers may become stale and we determine
1155      * that fact by tracking the count of valid parent pointers with
1156      * pushptrgood.  Pointers become invalid when their related stream
1157      * record gets pushed out.
1158      *
1159      * If no pointer is available (the data has already been pushed out),
1160      * then no fixup of e.g. the length field is possible for non-leaf
1161      * nodes.  The protocol allows for this situation by placing a larger
1162      * burden on the program scanning the stream on the other end.
1163      *
1164      * [parentA]
1165      *	  [node X]
1166      *    [parentB]
1167      *	     [node Y]
1168      *	     [node Z]
1169      *    (pop B)	see NOTE B
1170      * (pop A)		see NOTE A
1171      *
1172      * NOTE B:	This pop sets LAST in node Z if the node is still accessible,
1173      *		else a PAD record is appended and LAST is set in that.
1174      *
1175      *		This pop sets the record size in parentB if parentB is still
1176      *		accessible, else the record size is left 0 (the scanner must
1177      *		deal with that).
1178      *
1179      *		This pop sets the new 'last' record to parentB, the pointer
1180      *		to which may or may not still be accessible.
1181      *
1182      * NOTE A:	This pop sets LAST in parentB if the node is still accessible,
1183      *		else a PAD record is appended and LAST is set in that.
1184      *
1185      *		This pop sets the record size in parentA if parentA is still
1186      *		accessible, else the record size is left 0 (the scanner must
1187      *		deal with that).
1188      *
1189      *		This pop sets the new 'last' record to parentA, the pointer
1190      *		to which may or may not still be accessible.
1191      *
1192      * Also note that the last record in the stream transaction, which in
1193      * the above example is parentA, does not currently have the LAST bit
1194      * set.
1195      *
1196      * The current parent becomes the last record relative to the
1197      * saved parent passed into us.  It's validity is based on
1198      * whether pushptrgood is non-zero prior to decrementing.  The saved
1199      * parent becomes the new parent, and its validity is based on whether
1200      * pushptrgood is non-zero after decrementing.
1201      *
1202      * The old jrec->parent may be NULL if it is no longer accessible.
1203      * If pushptrgood is non-zero, however, it is guarenteed to not
1204      * be NULL (since no flush occured).
1205      */
1206     jrec->last = jrec->parent;
1207     --jrec->pushcount;
1208     if (jrec->pushptrgood) {
1209 	KKASSERT(jrec->last != NULL && last != NULL);
1210 	if (--jrec->pushptrgood == 0) {
1211 	    jrec->parent = NULL;	/* 'save' contains garbage or NULL */
1212 	} else {
1213 	    KKASSERT(save != NULL);
1214 	    jrec->parent = save;	/* 'save' must not be NULL */
1215 	}
1216 
1217 	/*
1218 	 * Set the record size in the old parent.  'last' still points to
1219 	 * the original last record in the subtransaction being popped,
1220 	 * jrec->last points to the old parent (which became the last
1221 	 * record relative to the new parent being popped into).
1222 	 */
1223 	jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1224     } else {
1225 	jrec->parent = NULL;
1226 	KKASSERT(jrec->last == NULL);
1227     }
1228 }
1229 
1230 /*
1231  * Write out a leaf record, including associated data.
1232  */
1233 static
1234 void
1235 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
1236 {
1237     jrecord_write(jrec, rectype, bytes);
1238     jrecord_data(jrec, ptr, bytes);
1239 }
1240 
1241 /*
1242  * Write a leaf record out and return a pointer to its base.  The leaf
1243  * record may contain potentially megabytes of data which is supplied
1244  * in jrecord_data() calls.  The exact amount must be specified in this
1245  * call.
1246  *
1247  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
1248  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
1249  * USE THE RETURN VALUE.
1250  */
1251 static
1252 struct journal_subrecord *
1253 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1254 {
1255     struct journal_subrecord *last;
1256     int pusheditout;
1257 
1258     /*
1259      * Try to catch some obvious errors.  Nesting records must specify a
1260      * size of 0, and there should be no left-overs from previous operations
1261      * (such as incomplete data writeouts).
1262      */
1263     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1264     KKASSERT(jrec->residual == 0);
1265 
1266     /*
1267      * Check to see if the current stream record has enough room for
1268      * the new subrecord header.  If it doesn't we extend the current
1269      * stream record.
1270      *
1271      * This may have the side effect of pushing out the current stream record
1272      * and creating a new one.  We must adjust our stream tracking fields
1273      * accordingly.
1274      */
1275     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1276 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1277 				jrec->stream_reserved - jrec->stream_residual,
1278 				JREC_DEFAULTSIZE, &pusheditout);
1279 	if (pusheditout) {
1280 	    /*
1281 	     * If a pushout occured, the pushed out stream record was
1282 	     * truncated as specified and the new record is exactly the
1283 	     * extension size specified.
1284 	     */
1285 	    jrec->stream_reserved = JREC_DEFAULTSIZE;
1286 	    jrec->stream_residual = JREC_DEFAULTSIZE;
1287 	    jrec->parent = NULL;	/* no longer accessible */
1288 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1289 	} else {
1290 	    /*
1291 	     * If no pushout occured the stream record is NOT truncated and
1292 	     * IS extended.
1293 	     */
1294 	    jrec->stream_reserved += JREC_DEFAULTSIZE;
1295 	    jrec->stream_residual += JREC_DEFAULTSIZE;
1296 	}
1297     }
1298     last = (void *)jrec->stream_ptr;
1299     last->rectype = rectype;
1300     last->reserved = 0;
1301     last->recsize = sizeof(struct journal_subrecord) + bytes;
1302     jrec->last = last;
1303     jrec->residual = bytes;		/* remaining data to be posted */
1304     jrec->residual_align = -bytes & 7;	/* post-data alignment required */
1305     jrec->stream_ptr += sizeof(*last);	/* current write pointer */
1306     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1307     return(last);
1308 }
1309 
1310 /*
1311  * Write out the data associated with a leaf record.  Any number of calls
1312  * to this routine may be made as long as the byte count adds up to the
1313  * amount originally specified in jrecord_write().
1314  *
1315  * The act of writing out the leaf data may result in numerous stream records
1316  * being pushed out.   Callers should be aware that even the associated
1317  * subrecord header may become inaccessible due to stream record pushouts.
1318  */
1319 static void
1320 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1321 {
1322     int pusheditout;
1323     int extsize;
1324 
1325     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1326 
1327     /*
1328      * Push out stream records as long as there is insufficient room to hold
1329      * the remaining data.
1330      */
1331     while (jrec->stream_residual < bytes) {
1332 	/*
1333 	 * Fill in any remaining space in the current stream record.
1334 	 */
1335 	bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1336 	buf = (const char *)buf + jrec->stream_residual;
1337 	bytes -= jrec->stream_residual;
1338 	/*jrec->stream_ptr += jrec->stream_residual;*/
1339 	jrec->residual -= jrec->stream_residual;
1340 	jrec->stream_residual = 0;
1341 
1342 	/*
1343 	 * Try to extend the current stream record, but no more then 1/4
1344 	 * the size of the FIFO.
1345 	 */
1346 	extsize = jrec->jo->fifo.size >> 2;
1347 	if (extsize > bytes)
1348 	    extsize = (bytes + 15) & ~15;
1349 
1350 	jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1351 				jrec->stream_reserved - jrec->stream_residual,
1352 				extsize, &pusheditout);
1353 	if (pusheditout) {
1354 	    jrec->stream_reserved = extsize;
1355 	    jrec->stream_residual = extsize;
1356 	    jrec->parent = NULL;	/* no longer accessible */
1357 	    jrec->last = NULL;		/* no longer accessible */
1358 	    jrec->pushptrgood = 0;	/* restored parents in pops no good */
1359 	} else {
1360 	    jrec->stream_reserved += extsize;
1361 	    jrec->stream_residual += extsize;
1362 	}
1363     }
1364 
1365     /*
1366      * Push out any remaining bytes into the current stream record.
1367      */
1368     if (bytes) {
1369 	bcopy(buf, jrec->stream_ptr, bytes);
1370 	jrec->stream_ptr += bytes;
1371 	jrec->stream_residual -= bytes;
1372 	jrec->residual -= bytes;
1373     }
1374 
1375     /*
1376      * Handle data alignment requirements for the subrecord.  Because the
1377      * stream record's data space is more strictly aligned, it must already
1378      * have sufficient space to hold any subrecord alignment slop.
1379      */
1380     if (jrec->residual == 0 && jrec->residual_align) {
1381 	KKASSERT(jrec->residual_align <= jrec->stream_residual);
1382 	bzero(jrec->stream_ptr, jrec->residual_align);
1383 	jrec->stream_ptr += jrec->residual_align;
1384 	jrec->stream_residual -= jrec->residual_align;
1385 	jrec->residual_align = 0;
1386     }
1387 }
1388 
1389 /*
1390  * We are finished with the transaction.  This closes the transaction created
1391  * by jrecord_init().
1392  *
1393  * NOTE: If abortit is not set then we must be at the top level with no
1394  *	 residual subrecord data left to output.
1395  *
1396  *	 If abortit is set then we can be in any state, all pushes will be
1397  *	 popped and it is ok for there to be residual data.  This works
1398  *	 because the virtual stream itself is truncated.  Scanners must deal
1399  *	 with this situation.
1400  *
1401  * The stream record will be committed or aborted as specified and jrecord
1402  * resources will be cleaned up.
1403  */
1404 static void
1405 jrecord_done(struct jrecord *jrec, int abortit)
1406 {
1407     KKASSERT(jrec->rawp != NULL);
1408 
1409     if (abortit) {
1410 	journal_abort(jrec->jo, &jrec->rawp);
1411     } else {
1412 	KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1413 	journal_commit(jrec->jo, &jrec->rawp,
1414 			jrec->stream_reserved - jrec->stream_residual, 1);
1415     }
1416 
1417     /*
1418      * jrec should not be used beyond this point without another init,
1419      * but clean up some fields to ensure that we panic if it is.
1420      *
1421      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1422      */
1423     jrec->jo = NULL;
1424     jrec->stream_ptr = NULL;
1425 }
1426 
1427 /************************************************************************
1428  *			LOW LEVEL RECORD SUPPORT ROUTINES		*
1429  ************************************************************************
1430  *
1431  * These routine create low level recursive and leaf subrecords representing
1432  * common filesystem structures.
1433  */
1434 
1435 /*
1436  * Write out a filename path relative to the base of the mount point.
1437  * rectype is typically JLEAF_PATH{1,2,3,4}.
1438  */
1439 static void
1440 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1441 {
1442     char buf[64];	/* local buffer if it fits, else malloced */
1443     char *base;
1444     int pathlen;
1445     int index;
1446     struct namecache *scan;
1447 
1448     /*
1449      * Pass 1 - figure out the number of bytes required.  Include terminating
1450      * 	       \0 on last element and '/' separator on other elements.
1451      */
1452 again:
1453     pathlen = 0;
1454     for (scan = ncp;
1455 	 scan && (scan->nc_flag & NCF_MOUNTPT) == 0;
1456 	 scan = scan->nc_parent
1457     ) {
1458 	pathlen += scan->nc_nlen + 1;
1459     }
1460 
1461     if (pathlen <= sizeof(buf))
1462 	base = buf;
1463     else
1464 	base = malloc(pathlen, M_TEMP, M_INTWAIT);
1465 
1466     /*
1467      * Pass 2 - generate the path buffer
1468      */
1469     index = pathlen;
1470     for (scan = ncp;
1471 	 scan && (scan->nc_flag & NCF_MOUNTPT) == 0;
1472 	 scan = scan->nc_parent
1473     ) {
1474 	if (scan->nc_nlen >= index) {
1475 	    if (base != buf)
1476 		free(base, M_TEMP);
1477 	    goto again;
1478 	}
1479 	if (index == pathlen)
1480 	    base[--index] = 0;
1481 	else
1482 	    base[--index] = '/';
1483 	index -= scan->nc_nlen;
1484 	bcopy(scan->nc_name, base + index, scan->nc_nlen);
1485     }
1486     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1487     if (base != buf)
1488 	free(base, M_TEMP);
1489 }
1490 
1491 /*
1492  * Write out a file attribute structure.  While somewhat inefficient, using
1493  * a recursive data structure is the most portable and extensible way.
1494  */
1495 static void
1496 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1497 {
1498     void *save;
1499 
1500     save = jrecord_push(jrec, JTYPE_VATTR);
1501     if (vat->va_type != VNON)
1502 	jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1503     if (vat->va_uid != VNOVAL)
1504 	jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1505     if (vat->va_nlink != VNOVAL)
1506 	jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1507     if (vat->va_uid != VNOVAL)
1508 	jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1509     if (vat->va_gid != VNOVAL)
1510 	jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1511     if (vat->va_fsid != VNOVAL)
1512 	jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1513     if (vat->va_fileid != VNOVAL)
1514 	jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1515     if (vat->va_size != VNOVAL)
1516 	jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1517     if (vat->va_atime.tv_sec != VNOVAL)
1518 	jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1519     if (vat->va_mtime.tv_sec != VNOVAL)
1520 	jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1521     if (vat->va_ctime.tv_sec != VNOVAL)
1522 	jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1523     if (vat->va_gen != VNOVAL)
1524 	jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1525     if (vat->va_flags != VNOVAL)
1526 	jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1527     if (vat->va_rdev != VNOVAL)
1528 	jrecord_leaf(jrec, JLEAF_UDEV, &vat->va_rdev, sizeof(vat->va_rdev));
1529 #if 0
1530     if (vat->va_filerev != VNOVAL)
1531 	jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1532 #endif
1533     jrecord_pop(jrec, save);
1534 }
1535 
1536 /*
1537  * Write out the creds used to issue a file operation.  If a process is
1538  * available write out additional tracking information related to the
1539  * process.
1540  *
1541  * XXX additional tracking info
1542  * XXX tty line info
1543  */
1544 static void
1545 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1546 {
1547     void *save;
1548     struct proc *p;
1549 
1550     save = jrecord_push(jrec, JTYPE_CRED);
1551     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1552     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1553     if (td && (p = td->td_proc) != NULL) {
1554 	jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1555 	jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1556     }
1557     jrecord_pop(jrec, save);
1558 }
1559 
1560 /*
1561  * Write out information required to identify a vnode
1562  *
1563  * XXX this needs work.  We should write out the inode number as well,
1564  * and in fact avoid writing out the file path for seqential writes
1565  * occuring within e.g. a certain period of time.
1566  */
1567 static void
1568 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1569 {
1570     struct namecache *ncp;
1571 
1572     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1573 	if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1574 	    break;
1575     }
1576     if (ncp)
1577 	jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1578 }
1579 
1580 #if 0
1581 /*
1582  * Write out the current contents of the file within the specified
1583  * range.  This is typically called from within an UNDO section.  A
1584  * locked vnode must be passed.
1585  */
1586 static int
1587 jrecord_write_filearea(struct jrecord *jrec, struct vnode *vp,
1588 			off_t begoff, off_t endoff)
1589 {
1590 }
1591 #endif
1592 
1593 /*
1594  * Write out the data represented by a pagelist
1595  */
1596 static void
1597 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1598 			struct vm_page **pglist, int *rtvals, int pgcount,
1599 			off_t offset)
1600 {
1601     struct msf_buf *msf;
1602     int error;
1603     int b;
1604     int i;
1605 
1606     i = 0;
1607     while (i < pgcount) {
1608 	/*
1609 	 * Find the next valid section.  Skip any invalid elements
1610 	 */
1611 	if (rtvals[i] != VM_PAGER_OK) {
1612 	    ++i;
1613 	    offset += PAGE_SIZE;
1614 	    continue;
1615 	}
1616 
1617 	/*
1618 	 * Figure out how big the valid section is, capping I/O at what the
1619 	 * MSFBUF can represent.
1620 	 */
1621 	b = i;
1622 	while (i < pgcount && i - b != XIO_INTERNAL_PAGES &&
1623 	       rtvals[i] == VM_PAGER_OK
1624 	) {
1625 	    ++i;
1626 	}
1627 
1628 	/*
1629 	 * And write it out.
1630 	 */
1631 	if (i - b) {
1632 	    error = msf_map_pagelist(&msf, pglist + b, i - b, 0);
1633 	    if (error == 0) {
1634 		printf("RECORD PUTPAGES %d\n", msf_buf_bytes(msf));
1635 		jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1636 		jrecord_leaf(jrec, rectype,
1637 			     msf_buf_kva(msf), msf_buf_bytes(msf));
1638 		msf_buf_free(msf);
1639 	    } else {
1640 		printf("jrecord_write_pagelist: mapping failure\n");
1641 	    }
1642 	    offset += (off_t)(i - b) << PAGE_SHIFT;
1643 	}
1644     }
1645 }
1646 
1647 /*
1648  * Write out the data represented by a UIO.
1649  */
1650 struct jwuio_info {
1651     struct jrecord *jrec;
1652     int16_t rectype;
1653 };
1654 
1655 static int jrecord_write_uio_callback(void *info, char *buf, int bytes);
1656 
1657 static void
1658 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1659 {
1660     struct jwuio_info info = { jrec, rectype };
1661     int error;
1662 
1663     if (uio->uio_segflg != UIO_NOCOPY) {
1664 	jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset,
1665 		     sizeof(uio->uio_offset));
1666 	error = msf_uio_iterate(uio, jrecord_write_uio_callback, &info);
1667 	if (error)
1668 	    printf("XXX warning uio iterate failed %d\n", error);
1669     }
1670 }
1671 
1672 static int
1673 jrecord_write_uio_callback(void *info_arg, char *buf, int bytes)
1674 {
1675     struct jwuio_info *info = info_arg;
1676 
1677     jrecord_leaf(info->jrec, info->rectype, buf, bytes);
1678     return(0);
1679 }
1680 
1681 /************************************************************************
1682  *			JOURNAL VNOPS					*
1683  ************************************************************************
1684  *
1685  * These are function shims replacing the normal filesystem ops.  We become
1686  * responsible for calling the underlying filesystem ops.  We have the choice
1687  * of executing the underlying op first and then generating the journal entry,
1688  * or starting the journal entry, executing the underlying op, and then
1689  * either completing or aborting it.
1690  *
1691  * The journal is supposed to be a high-level entity, which generally means
1692  * identifying files by name rather then by inode.  Supplying both allows
1693  * the journal to be used both for inode-number-compatible 'mirrors' and
1694  * for simple filesystem replication.
1695  *
1696  * Writes are particularly difficult to deal with because a single write may
1697  * represent a hundred megabyte buffer or more, and both writes and truncations
1698  * require the 'old' data to be written out as well as the new data if the
1699  * log is reversable.  Other issues:
1700  *
1701  * - How to deal with operations on unlinked files (no path available),
1702  *   but which may still be filesystem visible due to hard links.
1703  *
1704  * - How to deal with modifications made via a memory map.
1705  *
1706  * - Future cache coherency support will require cache coherency API calls
1707  *   both prior to and after the call to the underlying VFS.
1708  *
1709  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
1710  * new VFS equivalents (NMKDIR).
1711  */
1712 
1713 /*
1714  * Journal vop_settattr { a_vp, a_vap, a_cred, a_td }
1715  */
1716 static
1717 int
1718 journal_setattr(struct vop_setattr_args *ap)
1719 {
1720     struct mount *mp;
1721     struct journal *jo;
1722     struct jrecord jrec;
1723     void *save;		/* warning, save pointers do not always remain valid */
1724     int error;
1725 
1726     error = vop_journal_operate_ap(&ap->a_head);
1727     mp = ap->a_head.a_ops->vv_mount;
1728     if (error == 0) {
1729 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1730 	    jrecord_init(jo, &jrec, -1);
1731 	    save = jrecord_push(&jrec, JTYPE_SETATTR);
1732 	    jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1733 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1734 	    jrecord_write_vattr(&jrec, ap->a_vap);
1735 	    jrecord_pop(&jrec, save);
1736 	    jrecord_done(&jrec, 0);
1737 	}
1738     }
1739     return (error);
1740 }
1741 
1742 /*
1743  * Journal vop_write { a_vp, a_uio, a_ioflag, a_cred }
1744  */
1745 static
1746 int
1747 journal_write(struct vop_write_args *ap)
1748 {
1749     struct mount *mp;
1750     struct journal *jo;
1751     struct jrecord jrec;
1752     struct uio uio_copy;
1753     struct iovec uio_one_iovec;
1754     void *save;		/* warning, save pointers do not always remain valid */
1755     int error;
1756 
1757     /*
1758      * This is really nasty.  UIO's don't retain sufficient information to
1759      * be reusable once they've gone through the VOP chain.  The iovecs get
1760      * cleared, so we have to copy the UIO.
1761      *
1762      * XXX fix the UIO code to not destroy iov's during a scan so we can
1763      *     reuse the uio over and over again.
1764      */
1765     uio_copy = *ap->a_uio;
1766     if (uio_copy.uio_iovcnt == 1) {
1767 	uio_one_iovec = ap->a_uio->uio_iov[0];
1768 	uio_copy.uio_iov = &uio_one_iovec;
1769     } else {
1770 	uio_copy.uio_iov = malloc(uio_copy.uio_iovcnt * sizeof(struct iovec),
1771 				    M_JOURNAL, M_WAITOK);
1772 	bcopy(ap->a_uio->uio_iov, uio_copy.uio_iov,
1773 		uio_copy.uio_iovcnt * sizeof(struct iovec));
1774     }
1775 
1776     error = vop_journal_operate_ap(&ap->a_head);
1777     mp = ap->a_head.a_ops->vv_mount;
1778     if (error == 0) {
1779 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1780 	    jrecord_init(jo, &jrec, -1);
1781 	    save = jrecord_push(&jrec, JTYPE_WRITE);
1782 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
1783 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1784 	    jrecord_write_uio(&jrec, JLEAF_FILEDATA, &uio_copy);
1785 	    jrecord_pop(&jrec, save);
1786 	    jrecord_done(&jrec, 0);
1787 	}
1788     }
1789 
1790     if (uio_copy.uio_iov != &uio_one_iovec)
1791 	free(uio_copy.uio_iov, M_JOURNAL);
1792 
1793 
1794     return (error);
1795 }
1796 
1797 /*
1798  * Journal vop_fsync { a_vp, a_waitfor, a_td }
1799  */
1800 static
1801 int
1802 journal_fsync(struct vop_fsync_args *ap)
1803 {
1804     struct mount *mp;
1805     struct journal *jo;
1806     int error;
1807 
1808     error = vop_journal_operate_ap(&ap->a_head);
1809     mp = ap->a_head.a_ops->vv_mount;
1810     if (error == 0) {
1811 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1812 	    /* XXX synchronize pending journal records */
1813 	}
1814     }
1815     return (error);
1816 }
1817 
1818 /*
1819  * Journal vop_putpages { a_vp, a_m, a_count, a_sync, a_rtvals, a_offset }
1820  *
1821  * note: a_count is in bytes.
1822  */
1823 static
1824 int
1825 journal_putpages(struct vop_putpages_args *ap)
1826 {
1827     struct mount *mp;
1828     struct journal *jo;
1829     struct jrecord jrec;
1830     void *save;		/* warning, save pointers do not always remain valid */
1831     int error;
1832 
1833     error = vop_journal_operate_ap(&ap->a_head);
1834     mp = ap->a_head.a_ops->vv_mount;
1835     if (error == 0 && ap->a_count > 0) {
1836 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1837 	    jrecord_init(jo, &jrec, -1);
1838 	    save = jrecord_push(&jrec, JTYPE_PUTPAGES);
1839 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1840 	    jrecord_write_pagelist(&jrec, JLEAF_FILEDATA,
1841 			ap->a_m, ap->a_rtvals, btoc(ap->a_count), ap->a_offset);
1842 	    jrecord_pop(&jrec, save);
1843 	    jrecord_done(&jrec, 0);
1844 	}
1845     }
1846     return (error);
1847 }
1848 
1849 /*
1850  * Journal vop_setacl { a_vp, a_type, a_aclp, a_cred, a_td }
1851  */
1852 static
1853 int
1854 journal_setacl(struct vop_setacl_args *ap)
1855 {
1856     struct mount *mp;
1857     struct journal *jo;
1858     struct jrecord jrec;
1859     void *save;		/* warning, save pointers do not always remain valid */
1860     int error;
1861 
1862     error = vop_journal_operate_ap(&ap->a_head);
1863     mp = ap->a_head.a_ops->vv_mount;
1864     if (error == 0) {
1865 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1866 	    jrecord_init(jo, &jrec, -1);
1867 	    save = jrecord_push(&jrec, JTYPE_SETACL);
1868 	    jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1869 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1870 	    /* XXX type, aclp */
1871 	    jrecord_pop(&jrec, save);
1872 	    jrecord_done(&jrec, 0);
1873 	}
1874     }
1875     return (error);
1876 }
1877 
1878 /*
1879  * Journal vop_setextattr { a_vp, a_name, a_uio, a_cred, a_td }
1880  */
1881 static
1882 int
1883 journal_setextattr(struct vop_setextattr_args *ap)
1884 {
1885     struct mount *mp;
1886     struct journal *jo;
1887     struct jrecord jrec;
1888     void *save;		/* warning, save pointers do not always remain valid */
1889     int error;
1890 
1891     error = vop_journal_operate_ap(&ap->a_head);
1892     mp = ap->a_head.a_ops->vv_mount;
1893     if (error == 0) {
1894 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1895 	    jrecord_init(jo, &jrec, -1);
1896 	    save = jrecord_push(&jrec, JTYPE_SETEXTATTR);
1897 	    jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1898 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1899 	    jrecord_leaf(&jrec, JLEAF_ATTRNAME, ap->a_name, strlen(ap->a_name));
1900 	    jrecord_write_uio(&jrec, JLEAF_FILEDATA, ap->a_uio);
1901 	    jrecord_pop(&jrec, save);
1902 	    jrecord_done(&jrec, 0);
1903 	}
1904     }
1905     return (error);
1906 }
1907 
1908 /*
1909  * Journal vop_ncreate { a_ncp, a_vpp, a_cred, a_vap }
1910  */
1911 static
1912 int
1913 journal_ncreate(struct vop_ncreate_args *ap)
1914 {
1915     struct mount *mp;
1916     struct journal *jo;
1917     struct jrecord jrec;
1918     void *save;		/* warning, save pointers do not always remain valid */
1919     int error;
1920 
1921     error = vop_journal_operate_ap(&ap->a_head);
1922     mp = ap->a_head.a_ops->vv_mount;
1923     if (error == 0) {
1924 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1925 	    jrecord_init(jo, &jrec, -1);
1926 	    save = jrecord_push(&jrec, JTYPE_CREATE);
1927 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
1928 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1929 	    if (*ap->a_vpp)
1930 		jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1931 	    jrecord_pop(&jrec, save);
1932 	    jrecord_done(&jrec, 0);
1933 	}
1934     }
1935     return (error);
1936 }
1937 
1938 /*
1939  * Journal vop_nmknod { a_ncp, a_vpp, a_cred, a_vap }
1940  */
1941 static
1942 int
1943 journal_nmknod(struct vop_nmknod_args *ap)
1944 {
1945     struct mount *mp;
1946     struct journal *jo;
1947     struct jrecord jrec;
1948     void *save;		/* warning, save pointers do not always remain valid */
1949     int error;
1950 
1951     error = vop_journal_operate_ap(&ap->a_head);
1952     mp = ap->a_head.a_ops->vv_mount;
1953     if (error == 0) {
1954 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1955 	    jrecord_init(jo, &jrec, -1);
1956 	    save = jrecord_push(&jrec, JTYPE_MKNOD);
1957 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
1958 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1959 	    jrecord_write_vattr(&jrec, ap->a_vap);
1960 	    if (*ap->a_vpp)
1961 		jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1962 	    jrecord_pop(&jrec, save);
1963 	    jrecord_done(&jrec, 0);
1964 	}
1965     }
1966     return (error);
1967 }
1968 
1969 /*
1970  * Journal vop_nlink { a_ncp, a_vp, a_cred }
1971  */
1972 static
1973 int
1974 journal_nlink(struct vop_nlink_args *ap)
1975 {
1976     struct mount *mp;
1977     struct journal *jo;
1978     struct jrecord jrec;
1979     void *save;		/* warning, save pointers do not always remain valid */
1980     int error;
1981 
1982     error = vop_journal_operate_ap(&ap->a_head);
1983     mp = ap->a_head.a_ops->vv_mount;
1984     if (error == 0) {
1985 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1986 	    jrecord_init(jo, &jrec, -1);
1987 	    save = jrecord_push(&jrec, JTYPE_LINK);
1988 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
1989 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1990 	    jrecord_write_vnode_ref(&jrec, ap->a_vp);
1991 	    /* XXX PATH to VP and inode number */
1992 	    jrecord_pop(&jrec, save);
1993 	    jrecord_done(&jrec, 0);
1994 	}
1995     }
1996     return (error);
1997 }
1998 
1999 /*
2000  * Journal vop_symlink { a_ncp, a_vpp, a_cred, a_vap, a_target }
2001  */
2002 static
2003 int
2004 journal_nsymlink(struct vop_nsymlink_args *ap)
2005 {
2006     struct mount *mp;
2007     struct journal *jo;
2008     struct jrecord jrec;
2009     void *save;		/* warning, save pointers do not always remain valid */
2010     int error;
2011 
2012     error = vop_journal_operate_ap(&ap->a_head);
2013     mp = ap->a_head.a_ops->vv_mount;
2014     if (error == 0) {
2015 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2016 	    jrecord_init(jo, &jrec, -1);
2017 	    save = jrecord_push(&jrec, JTYPE_SYMLINK);
2018 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2019 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2020 	    jrecord_leaf(&jrec, JLEAF_SYMLINKDATA,
2021 			ap->a_target, strlen(ap->a_target));
2022 	    if (*ap->a_vpp)
2023 		jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2024 	    jrecord_pop(&jrec, save);
2025 	    jrecord_done(&jrec, 0);
2026 	}
2027     }
2028     return (error);
2029 }
2030 
2031 /*
2032  * Journal vop_nwhiteout { a_ncp, a_cred, a_flags }
2033  */
2034 static
2035 int
2036 journal_nwhiteout(struct vop_nwhiteout_args *ap)
2037 {
2038     struct mount *mp;
2039     struct journal *jo;
2040     struct jrecord jrec;
2041     void *save;		/* warning, save pointers do not always remain valid */
2042     int error;
2043 
2044     error = vop_journal_operate_ap(&ap->a_head);
2045     mp = ap->a_head.a_ops->vv_mount;
2046     if (error == 0) {
2047 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2048 	    jrecord_init(jo, &jrec, -1);
2049 	    save = jrecord_push(&jrec, JTYPE_WHITEOUT);
2050 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2051 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2052 	    jrecord_pop(&jrec, save);
2053 	    jrecord_done(&jrec, 0);
2054 	}
2055     }
2056     return (error);
2057 }
2058 
2059 /*
2060  * Journal vop_nremove { a_ncp, a_cred }
2061  */
2062 static
2063 int
2064 journal_nremove(struct vop_nremove_args *ap)
2065 {
2066     struct mount *mp;
2067     struct journal *jo;
2068     struct jrecord jrec;
2069     void *save;		/* warning, save pointers do not always remain valid */
2070     int error;
2071 
2072     error = vop_journal_operate_ap(&ap->a_head);
2073     mp = ap->a_head.a_ops->vv_mount;
2074     if (error == 0) {
2075 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2076 	    jrecord_init(jo, &jrec, -1);
2077 	    save = jrecord_push(&jrec, JTYPE_REMOVE);
2078 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2079 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2080 	    jrecord_pop(&jrec, save);
2081 	    jrecord_done(&jrec, 0);
2082 	}
2083     }
2084     return (error);
2085 }
2086 
2087 /*
2088  * Journal vop_nmkdir { a_ncp, a_vpp, a_cred, a_vap }
2089  */
2090 static
2091 int
2092 journal_nmkdir(struct vop_nmkdir_args *ap)
2093 {
2094     struct mount *mp;
2095     struct journal *jo;
2096     struct jrecord jrec;
2097     void *save;		/* warning, save pointers do not always remain valid */
2098     int error;
2099 
2100     error = vop_journal_operate_ap(&ap->a_head);
2101     mp = ap->a_head.a_ops->vv_mount;
2102     if (error == 0) {
2103 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2104 	    jrecord_init(jo, &jrec, -1);
2105 	    if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
2106 		save = jrecord_push(&jrec, JTYPE_UNDO);
2107 		/* XXX undo operations */
2108 		jrecord_pop(&jrec, save);
2109 	    }
2110 #if 0
2111 	    if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
2112 		jrecord_write_audit(&jrec);
2113 	    }
2114 #endif
2115 	    save = jrecord_push(&jrec, JTYPE_MKDIR);
2116 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2117 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2118 	    jrecord_write_vattr(&jrec, ap->a_vap);
2119 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2120 	    if (*ap->a_vpp)
2121 		jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2122 	    jrecord_pop(&jrec, save);
2123 	    jrecord_done(&jrec, 0);
2124 	}
2125     }
2126     return (error);
2127 }
2128 
2129 /*
2130  * Journal vop_nrmdir { a_ncp, a_cred }
2131  */
2132 static
2133 int
2134 journal_nrmdir(struct vop_nrmdir_args *ap)
2135 {
2136     struct mount *mp;
2137     struct journal *jo;
2138     struct jrecord jrec;
2139     void *save;		/* warning, save pointers do not always remain valid */
2140     int error;
2141 
2142     error = vop_journal_operate_ap(&ap->a_head);
2143     mp = ap->a_head.a_ops->vv_mount;
2144     if (error == 0) {
2145 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2146 	    jrecord_init(jo, &jrec, -1);
2147 	    save = jrecord_push(&jrec, JTYPE_RMDIR);
2148 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2149 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2150 	    jrecord_pop(&jrec, save);
2151 	    jrecord_done(&jrec, 0);
2152 	}
2153     }
2154     return (error);
2155 }
2156 
2157 /*
2158  * Journal vop_nrename { a_fncp, a_tncp, a_cred }
2159  */
2160 static
2161 int
2162 journal_nrename(struct vop_nrename_args *ap)
2163 {
2164     struct mount *mp;
2165     struct journal *jo;
2166     struct jrecord jrec;
2167     void *save;		/* warning, save pointers do not always remain valid */
2168     int error;
2169 
2170     error = vop_journal_operate_ap(&ap->a_head);
2171     mp = ap->a_head.a_ops->vv_mount;
2172     if (error == 0) {
2173 	TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2174 	    jrecord_init(jo, &jrec, -1);
2175 	    save = jrecord_push(&jrec, JTYPE_RENAME);
2176 	    jrecord_write_cred(&jrec, NULL, ap->a_cred);
2177 	    jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_fncp);
2178 	    jrecord_write_path(&jrec, JLEAF_PATH2, ap->a_tncp);
2179 	    jrecord_pop(&jrec, save);
2180 	    jrecord_done(&jrec, 0);
2181 	}
2182     }
2183     return (error);
2184 }
2185 
2186