xref: /dflybsd-src/sbin/hammer/cmd_mirror.c (revision d54592ee9e96c920b951af2e00cd72c0081ccae3)
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.12 2008/07/31 06:01:31 dillon Exp $
35  */
36 
37 #include "hammer.h"
38 
39 #define SERIALBUF_SIZE	(512 * 1024)
40 
41 static int read_mrecords(int fd, char *buf, u_int size,
42 			 hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44 			 hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46 			 hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48 			 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50 			 struct hammer_ioc_mrecord_head *pickup,
51 			 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
53 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
54 			u_int64_t *bwcount, struct timeval *tv1);
55 static void mirror_usage(int code);
56 
57 /*
58  * Generate a mirroring data stream from the specific source over the
59  * entire key range, but restricted to the specified transaction range.
60  *
61  * The HAMMER VFS does most of the work, we add a few new mrecord
62  * types to negotiate the TID ranges and verify that the entire
63  * stream made it to the destination.
64  */
65 void
66 hammer_cmd_mirror_read(char **av, int ac, int streaming)
67 {
68 	struct hammer_ioc_mirror_rw mirror;
69 	struct hammer_ioc_pseudofs_rw pfs;
70 	union hammer_ioc_mrecord_any mrec_tmp;
71 	struct hammer_ioc_mrecord_head pickup;
72 	hammer_ioc_mrecord_any_t mrec;
73 	hammer_tid_t sync_tid;
74 	const char *filesystem;
75 	char *buf = malloc(SERIALBUF_SIZE);
76 	int interrupted = 0;
77 	int error;
78 	int fd;
79 	int n;
80 	int didwork;
81 	int64_t total_bytes;
82 	time_t base_t = time(NULL);
83 	struct timeval bwtv;
84 	u_int64_t bwcount;
85 
86 	if (ac > 2)
87 		mirror_usage(1);
88 	filesystem = av[0];
89 
90 	pickup.signature = 0;
91 	pickup.type = 0;
92 
93 again:
94 	bzero(&mirror, sizeof(mirror));
95 	hammer_key_beg_init(&mirror.key_beg);
96 	hammer_key_end_init(&mirror.key_end);
97 
98 	fd = getpfs(&pfs, filesystem);
99 
100 	if (streaming && VerboseOpt) {
101 		fprintf(stderr, "\nRunning");
102 		fflush(stderr);
103 	}
104 	total_bytes = 0;
105 	gettimeofday(&bwtv, NULL);
106 	bwcount = 0;
107 
108 	/*
109 	 * In 2-way mode the target will send us a PFS info packet
110 	 * first.  Use the target's current snapshot TID as our default
111 	 * begin TID.
112 	 */
113 	mirror.tid_beg = 0;
114 	if (TwoWayPipeOpt) {
115 		n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
116 					 NULL, &mirror.tid_beg);
117 		if (n < 0) {	/* got TERM record */
118 			relpfs(fd, &pfs);
119 			return;
120 		}
121 		++mirror.tid_beg;
122 	}
123 
124 	/*
125 	 * Write out the PFS header, tid_beg will be updated if our PFS
126 	 * has a larger begin sync.  tid_end is set to the latest source
127 	 * TID whos flush cycle has completed.
128 	 */
129 	generate_mrec_header(fd, 1, pfs.pfs_id,
130 			     &mirror.tid_beg, &mirror.tid_end);
131 
132 	/* XXX streaming mode support w/ cycle or command line arg */
133 	/*
134 	 * A cycle file overrides the beginning TID
135 	 */
136 	hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
137 
138 	if (ac == 2)
139 		mirror.tid_beg = strtoull(av[1], NULL, 0);
140 
141 	if (streaming == 0 || VerboseOpt >= 2) {
142 		fprintf(stderr,
143 			"Mirror-read: Mirror from %016llx to %016llx\n",
144 			mirror.tid_beg, mirror.tid_end);
145 	}
146 	if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
147 		fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
148 			mirror.key_beg.obj_id);
149 	}
150 
151 	/*
152 	 * Nothing to do if begin equals end.
153 	 */
154 	if (mirror.tid_beg >= mirror.tid_end) {
155 		if (streaming == 0 || VerboseOpt >= 2)
156 			fprintf(stderr, "Mirror-read: No work to do\n");
157 		didwork = 0;
158 		goto done;
159 	}
160 	didwork = 1;
161 
162 	/*
163 	 * Write out bulk records
164 	 */
165 	mirror.ubuf = buf;
166 	mirror.size = SERIALBUF_SIZE;
167 
168 	do {
169 		mirror.count = 0;
170 		mirror.pfs_id = pfs.pfs_id;
171 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
172 		if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
173 			fprintf(stderr, "Mirror-read %s failed: %s\n",
174 				filesystem, strerror(errno));
175 			exit(1);
176 		}
177 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
178 			fprintf(stderr,
179 				"Mirror-read %s fatal error %d\n",
180 				filesystem, mirror.head.error);
181 			exit(1);
182 		}
183 		if (mirror.count) {
184 			if (BandwidthOpt) {
185 				n = writebw(1, mirror.ubuf, mirror.count,
186 					    &bwcount, &bwtv);
187 			} else {
188 				n = write(1, mirror.ubuf, mirror.count);
189 			}
190 			if (n != mirror.count) {
191 				fprintf(stderr, "Mirror-read %s failed: "
192 						"short write\n",
193 				filesystem);
194 				exit(1);
195 			}
196 		}
197 		total_bytes += mirror.count;
198 		if (streaming && VerboseOpt) {
199 			fprintf(stderr, "\r%016llx %11lld",
200 				mirror.key_cur.obj_id,
201 				total_bytes);
202 			fflush(stderr);
203 		}
204 		mirror.key_beg = mirror.key_cur;
205 		if (TimeoutOpt &&
206 		    (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
207 			fprintf(stderr,
208 				"Mirror-read %s interrupted by timer at"
209 				" %016llx\n",
210 				filesystem,
211 				mirror.key_cur.obj_id);
212 			interrupted = 1;
213 			break;
214 		}
215 	} while (mirror.count != 0);
216 
217 done:
218 	/*
219 	 * Write out the termination sync record - only if not interrupted
220 	 */
221 	if (interrupted == 0) {
222 		if (didwork) {
223 			write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
224 				      &mrec_tmp, sizeof(mrec_tmp.sync));
225 		} else {
226 			write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
227 				      &mrec_tmp, sizeof(mrec_tmp.sync));
228 		}
229 	}
230 
231 	/*
232 	 * If the -2 option was given (automatic when doing mirror-copy),
233 	 * a two-way pipe is assumed and we expect a response mrec from
234 	 * the target.
235 	 */
236 	if (TwoWayPipeOpt) {
237 		mrec = read_mrecord(0, &error, &pickup);
238 		if (mrec == NULL ||
239 		    mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
240 		    mrec->head.rec_size != sizeof(mrec->update)) {
241 			fprintf(stderr, "mirror_read: Did not get final "
242 					"acknowledgement packet from target\n");
243 			exit(1);
244 		}
245 		if (interrupted) {
246 			if (CyclePath) {
247 				hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
248 				fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
249 			}
250 		} else {
251 			sync_tid = mrec->update.tid;
252 			if (CyclePath) {
253 				hammer_key_beg_init(&mirror.key_beg);
254 				hammer_set_cycle(&mirror.key_beg, sync_tid);
255 				fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
256 					CyclePath, sync_tid);
257 			}
258 		}
259 	} else if (CyclePath) {
260 		/* NOTE! mirror.tid_beg cannot be updated */
261 		fprintf(stderr, "Warning: cycle file (-c option) cannot be "
262 				"fully updated unless you use mirror-copy\n");
263 		hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
264 	}
265 	if (streaming && interrupted == 0) {
266 		time_t t1 = time(NULL);
267 		time_t t2;
268 
269 		if (VerboseOpt) {
270 			fprintf(stderr, " W");
271 			fflush(stderr);
272 		}
273 		pfs.ondisk->sync_end_tid = mirror.tid_end;
274 		if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
275 			fprintf(stderr, "Mirror-read %s: cannot stream: %s\n",
276 				filesystem, strerror(errno));
277 		} else {
278 			t2 = time(NULL) - t1;
279 			if (t2 >= 0 && t2 < DelayOpt) {
280 				if (VerboseOpt) {
281 					fprintf(stderr, "\bD");
282 					fflush(stderr);
283 				}
284 				sleep(DelayOpt - t2);
285 			}
286 			if (VerboseOpt) {
287 				fprintf(stderr, "\b ");
288 				fflush(stderr);
289 			}
290 			relpfs(fd, &pfs);
291 			goto again;
292 		}
293 	}
294 	write_mrecord(1, HAMMER_MREC_TYPE_TERM,
295 		      &mrec_tmp, sizeof(mrec_tmp.sync));
296 	relpfs(fd, &pfs);
297 	fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
298 }
299 
300 /*
301  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
302  * some additional packet types to negotiate TID ranges and to verify
303  * completion.  The HAMMER VFS does most of the work.
304  *
305  * It is important to note that the mirror.key_{beg,end} range must
306  * match the ranged used by the original.  For now both sides use
307  * range the entire key space.
308  *
309  * It is even more important that the records in the stream conform
310  * to the TID range also supplied in the stream.  The HAMMER VFS will
311  * use the REC, PASS, and SKIP record types to track the portions of
312  * the B-Tree being scanned in order to be able to proactively delete
313  * records on the target within those active areas that are not mentioned
314  * by the source.
315  *
316  * The mirror.key_cur field is used by the VFS to do this tracking.  It
317  * must be initialized to key_beg but then is persistently updated by
318  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
319  * field you will blow up the mirror target, possibly to the point of
320  * deleting everything.  As a safety measure the HAMMER VFS simply marks
321  * the records that the source has destroyed as deleted on the target,
322  * and normal pruning operations will deal with their final disposition
323  * at some later time.
324  */
325 void
326 hammer_cmd_mirror_write(char **av, int ac)
327 {
328 	struct hammer_ioc_mirror_rw mirror;
329 	const char *filesystem;
330 	char *buf = malloc(SERIALBUF_SIZE);
331 	struct hammer_ioc_pseudofs_rw pfs;
332 	struct hammer_ioc_mrecord_head pickup;
333 	struct hammer_ioc_synctid synctid;
334 	union hammer_ioc_mrecord_any mrec_tmp;
335 	hammer_ioc_mrecord_any_t mrec;
336 	int error;
337 	int fd;
338 	int n;
339 
340 	if (ac > 2)
341 		mirror_usage(1);
342 	filesystem = av[0];
343 
344 	pickup.signature = 0;
345 	pickup.type = 0;
346 
347 again:
348 	bzero(&mirror, sizeof(mirror));
349 	hammer_key_beg_init(&mirror.key_beg);
350 	hammer_key_end_init(&mirror.key_end);
351 	mirror.key_end = mirror.key_beg;
352 
353 	fd = getpfs(&pfs, filesystem);
354 
355 	/*
356 	 * In two-way mode the target writes out a PFS packet first.
357 	 * The source uses our tid_end as its tid_beg by default,
358 	 * picking up where it left off.
359 	 */
360 	mirror.tid_beg = 0;
361 	if (TwoWayPipeOpt) {
362 		generate_mrec_header(fd, 1, pfs.pfs_id,
363 				     &mirror.tid_beg, &mirror.tid_end);
364 	}
365 
366 	/*
367 	 * Read and process the PFS header.  The source informs us of
368 	 * the TID range the stream represents.
369 	 */
370 	n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
371 				 &mirror.tid_beg, &mirror.tid_end);
372 	if (n < 0) {	/* got TERM record */
373 		relpfs(fd, &pfs);
374 		return;
375 	}
376 
377 	mirror.ubuf = buf;
378 	mirror.size = SERIALBUF_SIZE;
379 
380 	/*
381 	 * Read and process bulk records (REC, PASS, and SKIP types).
382 	 *
383 	 * On your life, do NOT mess with mirror.key_cur or your mirror
384 	 * target may become history.
385 	 */
386 	for (;;) {
387 		mirror.count = 0;
388 		mirror.pfs_id = pfs.pfs_id;
389 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
390 		mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
391 		if (mirror.size <= 0)
392 			break;
393 		if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
394 			fprintf(stderr, "Mirror-write %s failed: %s\n",
395 				filesystem, strerror(errno));
396 			exit(1);
397 		}
398 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
399 			fprintf(stderr,
400 				"Mirror-write %s fatal error %d\n",
401 				filesystem, mirror.head.error);
402 			exit(1);
403 		}
404 #if 0
405 		if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
406 			fprintf(stderr,
407 				"Mirror-write %s interrupted by timer at"
408 				" %016llx\n",
409 				filesystem,
410 				mirror.key_cur.obj_id);
411 			exit(0);
412 		}
413 #endif
414 	}
415 
416 	/*
417 	 * Read and process the termination sync record.
418 	 */
419 	mrec = read_mrecord(0, &error, &pickup);
420 
421 	if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
422 		fprintf(stderr, "Mirror-write: received termination request\n");
423 		free(mrec);
424 		return;
425 	}
426 
427 	if (mrec == NULL ||
428 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
429 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
430 	    mrec->head.rec_size != sizeof(mrec->sync)) {
431 		fprintf(stderr, "Mirror-write %s: Did not get termination "
432 				"sync record, or rec_size is wrong rt=%d\n",
433 				filesystem, mrec->head.type);
434 		exit(1);
435 	}
436 
437 	/*
438 	 * Update the PFS info on the target so the user has visibility
439 	 * into the new snapshot, and sync the target filesystem.
440 	 */
441 	if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
442 		update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
443 
444 		bzero(&synctid, sizeof(synctid));
445 		synctid.op = HAMMER_SYNCTID_SYNC2;
446 		ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
447 
448 		if (VerboseOpt >= 2) {
449 			fprintf(stderr, "Mirror-write %s: succeeded\n",
450 				filesystem);
451 		}
452 	}
453 
454 	free(mrec);
455 	mrec = NULL;
456 
457 	/*
458 	 * Report back to the originator.
459 	 */
460 	if (TwoWayPipeOpt) {
461 		mrec_tmp.update.tid = mirror.tid_end;
462 		write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
463 			      &mrec_tmp, sizeof(mrec_tmp.update));
464 	} else {
465 		printf("Source can update synctid to 0x%016llx\n",
466 		       mirror.tid_end);
467 	}
468 	relpfs(fd, &pfs);
469 	goto again;
470 }
471 
472 void
473 hammer_cmd_mirror_dump(void)
474 {
475 	char *buf = malloc(SERIALBUF_SIZE);
476 	struct hammer_ioc_mrecord_head pickup;
477 	hammer_ioc_mrecord_any_t mrec;
478 	int error;
479 	int size;
480 	int offset;
481 	int bytes;
482 
483 	/*
484 	 * Read and process the PFS header
485 	 */
486 	pickup.signature = 0;
487 	pickup.type = 0;
488 
489 	mrec = read_mrecord(0, &error, &pickup);
490 
491 	/*
492 	 * Read and process bulk records
493 	 */
494 	for (;;) {
495 		size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
496 		if (size <= 0)
497 			break;
498 		offset = 0;
499 		while (offset < size) {
500 			mrec = (void *)((char *)buf + offset);
501 			bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
502 			if (offset + bytes > size) {
503 				fprintf(stderr, "Misaligned record\n");
504 				exit(1);
505 			}
506 
507 			switch(mrec->head.type) {
508 			case HAMMER_MREC_TYPE_REC:
509 				printf("Record obj=%016llx key=%016llx "
510 				       "rt=%02x ot=%02x\n",
511 					mrec->rec.leaf.base.obj_id,
512 					mrec->rec.leaf.base.key,
513 					mrec->rec.leaf.base.rec_type,
514 					mrec->rec.leaf.base.obj_type);
515 				printf("       tids %016llx:%016llx data=%d\n",
516 					mrec->rec.leaf.base.create_tid,
517 					mrec->rec.leaf.base.delete_tid,
518 					mrec->rec.leaf.data_len);
519 				break;
520 			case HAMMER_MREC_TYPE_PASS:
521 				printf("Pass   obj=%016llx key=%016llx "
522 				       "rt=%02x ot=%02x\n",
523 					mrec->rec.leaf.base.obj_id,
524 					mrec->rec.leaf.base.key,
525 					mrec->rec.leaf.base.rec_type,
526 					mrec->rec.leaf.base.obj_type);
527 				printf("       tids %016llx:%016llx data=%d\n",
528 					mrec->rec.leaf.base.create_tid,
529 					mrec->rec.leaf.base.delete_tid,
530 					mrec->rec.leaf.data_len);
531 				break;
532 			case HAMMER_MREC_TYPE_SKIP:
533 				printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
534 				       "       obj=%016llx key=%016llx rt=%02x\n",
535 				       mrec->skip.skip_beg.obj_id,
536 				       mrec->skip.skip_beg.key,
537 				       mrec->skip.skip_beg.rec_type,
538 				       mrec->skip.skip_end.obj_id,
539 				       mrec->skip.skip_end.key,
540 				       mrec->skip.skip_end.rec_type);
541 			default:
542 				break;
543 			}
544 			offset += bytes;
545 		}
546 	}
547 
548 	/*
549 	 * Read and process the termination sync record.
550 	 */
551 	mrec = read_mrecord(0, &error, &pickup);
552 	if (mrec == NULL ||
553 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
554 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE)
555 	 ) {
556 		fprintf(stderr, "Mirror-dump: Did not get termination "
557 				"sync record\n");
558 	}
559 }
560 
561 void
562 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
563 {
564 	pid_t pid1;
565 	pid_t pid2;
566 	int fds[2];
567 	const char *xav[16];
568 	char tbuf[16];
569 	char *ptr;
570 	int xac;
571 
572 	if (ac != 2)
573 		mirror_usage(1);
574 
575 	if (pipe(fds) < 0) {
576 		perror("pipe");
577 		exit(1);
578 	}
579 
580 	TwoWayPipeOpt = 1;
581 
582 	/*
583 	 * Source
584 	 */
585 	if ((pid1 = fork()) == 0) {
586 		dup2(fds[0], 0);
587 		dup2(fds[0], 1);
588 		close(fds[0]);
589 		close(fds[1]);
590 		if ((ptr = strchr(av[0], ':')) != NULL) {
591 			*ptr++ = 0;
592 			xac = 0;
593 			xav[xac++] = "ssh";
594 			xav[xac++] = av[0];
595 			xav[xac++] = "hammer";
596 
597 			switch(VerboseOpt) {
598 			case 0:
599 				break;
600 			case 1:
601 				xav[xac++] = "-v";
602 				break;
603 			case 2:
604 				xav[xac++] = "-vv";
605 				break;
606 			default:
607 				xav[xac++] = "-vvv";
608 				break;
609 			}
610 			xav[xac++] = "-2";
611 			if (TimeoutOpt) {
612 				snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
613 				xav[xac++] = "-t";
614 				xav[xac++] = tbuf;
615 			}
616 			if (streaming)
617 				xav[xac++] = "mirror-read-streaming";
618 			else
619 				xav[xac++] = "mirror-read";
620 			xav[xac++] = ptr;
621 			xav[xac++] = NULL;
622 			execv("/usr/bin/ssh", (void *)xav);
623 		} else {
624 			hammer_cmd_mirror_read(av, 1, streaming);
625 			fflush(stdout);
626 			fflush(stderr);
627 		}
628 		_exit(1);
629 	}
630 
631 	/*
632 	 * Target
633 	 */
634 	if ((pid2 = fork()) == 0) {
635 		dup2(fds[1], 0);
636 		dup2(fds[1], 1);
637 		close(fds[0]);
638 		close(fds[1]);
639 		if ((ptr = strchr(av[1], ':')) != NULL) {
640 			*ptr++ = 0;
641 			xac = 0;
642 			xav[xac++] = "ssh";
643 			xav[xac++] = av[1];
644 			xav[xac++] = "hammer";
645 
646 			switch(VerboseOpt) {
647 			case 0:
648 				break;
649 			case 1:
650 				xav[xac++] = "-v";
651 				break;
652 			case 2:
653 				xav[xac++] = "-vv";
654 				break;
655 			default:
656 				xav[xac++] = "-vvv";
657 				break;
658 			}
659 
660 			xav[xac++] = "-2";
661 			xav[xac++] = "mirror-write";
662 			xav[xac++] = ptr;
663 			xav[xac++] = NULL;
664 			execv("/usr/bin/ssh", (void *)xav);
665 		} else {
666 			hammer_cmd_mirror_write(av + 1, 1);
667 			fflush(stdout);
668 			fflush(stderr);
669 		}
670 		_exit(1);
671 	}
672 	close(fds[0]);
673 	close(fds[1]);
674 
675 	while (waitpid(pid1, NULL, 0) <= 0)
676 		;
677 	while (waitpid(pid2, NULL, 0) <= 0)
678 		;
679 }
680 
681 /*
682  * Read and return multiple mrecords
683  */
684 static int
685 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
686 {
687 	hammer_ioc_mrecord_any_t mrec;
688 	u_int count;
689 	size_t n;
690 	size_t i;
691 	size_t bytes;
692 
693 	count = 0;
694 	while (size - count >= HAMMER_MREC_HEADSIZE) {
695 		/*
696 		 * Cached the record header in case we run out of buffer
697 		 * space.
698 		 */
699 		fflush(stdout);
700 		if (pickup->signature == 0) {
701 			for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
702 				i = read(fd, (char *)pickup + n,
703 					 HAMMER_MREC_HEADSIZE - n);
704 				if (i <= 0)
705 					break;
706 			}
707 			if (n == 0)
708 				break;
709 			if (n != HAMMER_MREC_HEADSIZE) {
710 				fprintf(stderr, "read_mrecords: short read on pipe\n");
711 				exit(1);
712 			}
713 
714 			if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
715 				fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
716 				exit(1);
717 			}
718 		}
719 		if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
720 		    pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
721 			fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
722 			exit(1);
723 		}
724 
725 		/*
726 		 * Stop if we have insufficient space for the record and data.
727 		 */
728 		bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
729 		if (size - count < bytes)
730 			break;
731 
732 		/*
733 		 * Stop if the record type is not a REC or a SKIP (the only
734 		 * two types the ioctl supports.  Other types are used only
735 		 * by the userland protocol).
736 		 */
737 		if (pickup->type != HAMMER_MREC_TYPE_REC &&
738 		    pickup->type != HAMMER_MREC_TYPE_SKIP &&
739 		    pickup->type != HAMMER_MREC_TYPE_PASS) {
740 			break;
741 		}
742 
743 		/*
744 		 * Read the remainder and clear the pickup signature.
745 		 */
746 		for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
747 			i = read(fd, buf + count + n, bytes - n);
748 			if (i <= 0)
749 				break;
750 		}
751 		if (n != bytes) {
752 			fprintf(stderr, "read_mrecords: short read on pipe\n");
753 			exit(1);
754 		}
755 
756 		bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
757 		pickup->signature = 0;
758 		pickup->type = 0;
759 		mrec = (void *)(buf + count);
760 
761 		/*
762 		 * Validate the completed record
763 		 */
764 		if (mrec->head.rec_crc !=
765 		    crc32((char *)mrec + HAMMER_MREC_CRCOFF,
766 			  mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
767 			fprintf(stderr, "read_mrecords: malformed record "
768 					"on pipe, bad crc\n");
769 			exit(1);
770 		}
771 
772 		/*
773 		 * If its a B-Tree record validate the data crc
774 		 */
775 		if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
776 			if (mrec->head.rec_size <
777 			    sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
778 				fprintf(stderr,
779 					"read_mrecords: malformed record on "
780 					"pipe, illegal element data_len\n");
781 				exit(1);
782 			}
783 			if (mrec->rec.leaf.data_len &&
784 			    mrec->rec.leaf.data_offset &&
785 			    hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
786 				fprintf(stderr,
787 					"read_mrecords: data_crc did not "
788 					"match data! obj=%016llx key=%016llx\n",
789 					mrec->rec.leaf.base.obj_id,
790 					mrec->rec.leaf.base.key);
791 				fprintf(stderr,
792 					"continuing, but there are problems\n");
793 			}
794 		}
795 		count += bytes;
796 	}
797 	return(count);
798 }
799 
800 /*
801  * Read and return a single mrecord.
802  */
803 static
804 hammer_ioc_mrecord_any_t
805 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
806 {
807 	hammer_ioc_mrecord_any_t mrec;
808 	struct hammer_ioc_mrecord_head mrechd;
809 	size_t bytes;
810 	size_t n;
811 	size_t i;
812 
813 	if (pickup && pickup->type != 0) {
814 		mrechd = *pickup;
815 		pickup->signature = 0;
816 		pickup->type = 0;
817 		n = HAMMER_MREC_HEADSIZE;
818 	} else {
819 		/*
820 		 * Read in the PFSD header from the sender.
821 		 */
822 		for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
823 			i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
824 			if (i <= 0)
825 				break;
826 		}
827 		if (n == 0) {
828 			*errorp = 0;	/* EOF */
829 			return(NULL);
830 		}
831 		if (n != HAMMER_MREC_HEADSIZE) {
832 			fprintf(stderr, "short read of mrecord header\n");
833 			*errorp = EPIPE;
834 			return(NULL);
835 		}
836 	}
837 	if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
838 		fprintf(stderr, "read_mrecord: bad signature\n");
839 		*errorp = EINVAL;
840 		return(NULL);
841 	}
842 	bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
843 	assert(bytes >= sizeof(mrechd));
844 	mrec = malloc(bytes);
845 	mrec->head = mrechd;
846 
847 	while (n < bytes) {
848 		i = read(fdin, (char *)mrec + n, bytes - n);
849 		if (i <= 0)
850 			break;
851 		n += i;
852 	}
853 	if (n != bytes) {
854 		fprintf(stderr, "read_mrecord: short read on payload\n");
855 		*errorp = EPIPE;
856 		return(NULL);
857 	}
858 	if (mrec->head.rec_crc !=
859 	    crc32((char *)mrec + HAMMER_MREC_CRCOFF,
860 		  mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
861 		fprintf(stderr, "read_mrecord: bad CRC\n");
862 		*errorp = EINVAL;
863 		return(NULL);
864 	}
865 	*errorp = 0;
866 	return(mrec);
867 }
868 
869 static
870 void
871 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
872 	      int bytes)
873 {
874 	char zbuf[HAMMER_HEAD_ALIGN];
875 	int pad;
876 
877 	pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
878 
879 	assert(bytes >= (int)sizeof(mrec->head));
880 	bzero(&mrec->head, sizeof(mrec->head));
881 	mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
882 	mrec->head.type = type;
883 	mrec->head.rec_size = bytes;
884 	mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
885 				   bytes - HAMMER_MREC_CRCOFF);
886 	if (write(fdout, mrec, bytes) != bytes) {
887 		fprintf(stderr, "write_mrecord: error %d (%s)\n",
888 			errno, strerror(errno));
889 		exit(1);
890 	}
891 	if (pad) {
892 		bzero(zbuf, pad);
893 		if (write(fdout, zbuf, pad) != pad) {
894 			fprintf(stderr, "write_mrecord: error %d (%s)\n",
895 				errno, strerror(errno));
896 			exit(1);
897 		}
898 	}
899 }
900 
901 /*
902  * Generate a mirroring header with the pfs information of the
903  * originating filesytem.
904  */
905 static void
906 generate_mrec_header(int fd, int fdout, int pfs_id,
907 		     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
908 {
909 	struct hammer_ioc_pseudofs_rw pfs;
910 	union hammer_ioc_mrecord_any mrec_tmp;
911 
912 	bzero(&pfs, sizeof(pfs));
913 	bzero(&mrec_tmp, sizeof(mrec_tmp));
914 	pfs.pfs_id = pfs_id;
915 	pfs.ondisk = &mrec_tmp.pfs.pfsd;
916 	pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
917 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
918 		fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
919 		exit(1);
920 	}
921 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
922 		fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
923 		exit(1);
924 	}
925 
926 	/*
927 	 * sync_beg_tid - lowest TID on source after which a full history
928 	 *	 	  is available.
929 	 *
930 	 * sync_end_tid - highest fully synchronized TID from source.
931 	 */
932 	if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
933 		*tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
934 	if (tid_endp)
935 		*tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
936 	mrec_tmp.pfs.version = pfs.version;
937 	write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
938 		      &mrec_tmp, sizeof(mrec_tmp.pfs));
939 }
940 
941 /*
942  * Validate the pfs information from the originating filesystem
943  * against the target filesystem.  shared_uuid must match.
944  *
945  * return -1 if we got a TERM record
946  */
947 static int
948 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
949 		     struct hammer_ioc_mrecord_head *pickup,
950 		     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
951 {
952 	struct hammer_ioc_pseudofs_rw pfs;
953 	struct hammer_pseudofs_data pfsd;
954 	hammer_ioc_mrecord_any_t mrec;
955 	int error;
956 
957 	/*
958 	 * Get the PFSD info from the target filesystem.
959 	 */
960 	bzero(&pfs, sizeof(pfs));
961 	bzero(&pfsd, sizeof(pfsd));
962 	pfs.pfs_id = pfs_id;
963 	pfs.ondisk = &pfsd;
964 	pfs.bytes = sizeof(pfsd);
965 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
966 		fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
967 		exit(1);
968 	}
969 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
970 		fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
971 		exit(1);
972 	}
973 
974 	mrec = read_mrecord(fdin, &error, pickup);
975 	if (mrec == NULL) {
976 		if (error == 0)
977 			fprintf(stderr, "validate_mrec_header: short read\n");
978 		exit(1);
979 	}
980 	if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
981 		free(mrec);
982 		return(-1);
983 	}
984 
985 	if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
986 		fprintf(stderr, "validate_mrec_header: did not get expected "
987 				"PFSD record type\n");
988 		exit(1);
989 	}
990 	if (mrec->head.rec_size != sizeof(mrec->pfs)) {
991 		fprintf(stderr, "validate_mrec_header: unexpected payload "
992 				"size\n");
993 		exit(1);
994 	}
995 	if (mrec->pfs.version != pfs.version) {
996 		fprintf(stderr, "validate_mrec_header: Version mismatch\n");
997 		exit(1);
998 	}
999 
1000 	/*
1001 	 * Whew.  Ok, is the read PFS info compatible with the target?
1002 	 */
1003 	if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1004 		 sizeof(pfsd.shared_uuid)) != 0) {
1005 		fprintf(stderr,
1006 			"mirror-write: source and target have "
1007 			"different shared-uuid's!\n");
1008 		exit(1);
1009 	}
1010 	if (is_target &&
1011 	    (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
1012 		fprintf(stderr, "mirror-write: target must be in slave mode\n");
1013 		exit(1);
1014 	}
1015 	if (tid_begp)
1016 		*tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1017 	if (tid_endp)
1018 		*tid_endp = mrec->pfs.pfsd.sync_end_tid;
1019 	free(mrec);
1020 	return(0);
1021 }
1022 
1023 static void
1024 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1025 {
1026 	struct hammer_ioc_pseudofs_rw pfs;
1027 	struct hammer_pseudofs_data pfsd;
1028 
1029 	bzero(&pfs, sizeof(pfs));
1030 	bzero(&pfsd, sizeof(pfsd));
1031 	pfs.pfs_id = pfs_id;
1032 	pfs.ondisk = &pfsd;
1033 	pfs.bytes = sizeof(pfsd);
1034 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1035 		perror("update_pfs_snapshot (read)");
1036 		exit(1);
1037 	}
1038 	if (pfsd.sync_end_tid != snapshot_tid) {
1039 		pfsd.sync_end_tid = snapshot_tid;
1040 		if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1041 			perror("update_pfs_snapshot (rewrite)");
1042 			exit(1);
1043 		}
1044 		if (VerboseOpt >= 2) {
1045 			fprintf(stderr,
1046 				"Mirror-write: Completed, updated snapshot "
1047 				"to %016llx\n",
1048 				snapshot_tid);
1049 		}
1050 	}
1051 }
1052 
1053 /*
1054  * Bandwidth-limited write in chunks
1055  */
1056 static
1057 ssize_t
1058 writebw(int fd, const void *buf, size_t nbytes,
1059 	u_int64_t *bwcount, struct timeval *tv1)
1060 {
1061 	struct timeval tv2;
1062 	size_t n;
1063 	ssize_t r;
1064 	ssize_t a;
1065 	int usec;
1066 
1067 	a = 0;
1068 	r = 0;
1069 	while (nbytes) {
1070 		if (*bwcount + nbytes > BandwidthOpt)
1071 			n = BandwidthOpt - *bwcount;
1072 		else
1073 			n = nbytes;
1074 		if (n)
1075 			r = write(fd, buf, n);
1076 		if (r >= 0) {
1077 			a += r;
1078 			nbytes -= r;
1079 			buf = (const char *)buf + r;
1080 		}
1081 		if ((size_t)r != n)
1082 			break;
1083 		*bwcount += n;
1084 		if (*bwcount >= BandwidthOpt) {
1085 			gettimeofday(&tv2, NULL);
1086 			usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1087 				(int)(tv2.tv_usec - tv1->tv_usec);
1088 			if (usec >= 0 && usec < 1000000)
1089 				usleep(1000000 - usec);
1090 			gettimeofday(tv1, NULL);
1091 			*bwcount -= BandwidthOpt;
1092 		}
1093 	}
1094 	return(a ? a : r);
1095 }
1096 
1097 static void
1098 mirror_usage(int code)
1099 {
1100 	fprintf(stderr,
1101 		"hammer mirror-read <filesystem>\n"
1102 		"hammer mirror-write <filesystem>\n"
1103 		"hammer mirror-dump\n"
1104 		"hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
1105 	);
1106 	exit(code);
1107 }
1108