xref: /dflybsd-src/sbin/hammer/cmd_mirror.c (revision 1cf08730c23f1c205ebd18f51b9a91d805d262f9)
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 
35 #include "hammer.h"
36 
37 #define LINE1	0,20
38 #define LINE2	20,78
39 #define LINE3	90,70
40 
41 #define SERIALBUF_SIZE	(512 * 1024)
42 
43 typedef struct histogram {
44 	hammer_tid_t	tid;
45 	uint64_t	bytes;
46 } *histogram_t;
47 
48 const char *ScoreBoardFile;
49 const char *RestrictTarget;
50 
51 static int read_mrecords(int fd, char *buf, u_int size,
52 			 hammer_ioc_mrecord_head_t pickup);
53 static int generate_histogram(int fd, const char *filesystem,
54 			 histogram_t *histogram_ary,
55 			 struct hammer_ioc_mirror_rw *mirror_base,
56 			 int *repeatp);
57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
58 			 hammer_ioc_mrecord_head_t pickup);
59 static void write_mrecord(int fdout, uint32_t type,
60 			 hammer_ioc_mrecord_any_t mrec, int bytes);
61 static void generate_mrec_header(int fd, int pfs_id,
62 			 union hammer_ioc_mrecord_any *mrec_tmp);
63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
64 			 struct hammer_ioc_mrecord_head *pickup,
65 			 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
67 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
68 			uint64_t *bwcount, struct timeval *tv1);
69 static int getyntty(void);
70 static void score_printf(size_t i, size_t w, const char *ctl, ...);
71 static void hammer_check_restrict(const char *filesystem);
72 static void mirror_usage(int code);
73 
74 /*
75  * Generate a mirroring data stream from the specific source over the
76  * entire key range, but restricted to the specified transaction range.
77  *
78  * The HAMMER VFS does most of the work, we add a few new mrecord
79  * types to negotiate the TID ranges and verify that the entire
80  * stream made it to the destination.
81  *
82  * streaming will be 0 for mirror-read, 1 for mirror-stream.  The code will
83  * set up a fake value of -1 when running the histogram for mirror-read.
84  */
85 void
86 hammer_cmd_mirror_read(char **av, int ac, int streaming)
87 {
88 	struct hammer_ioc_mirror_rw mirror;
89 	struct hammer_ioc_pseudofs_rw pfs;
90 	union hammer_ioc_mrecord_any mrec_tmp;
91 	struct hammer_ioc_mrecord_head pickup;
92 	hammer_ioc_mrecord_any_t mrec;
93 	hammer_tid_t sync_tid;
94 	histogram_t histogram_ary;
95 	const char *filesystem;
96 	char *buf = malloc(SERIALBUF_SIZE);
97 	int interrupted = 0;
98 	int error;
99 	int fd;
100 	int n;
101 	int didwork;
102 	int histogram;
103 	int histindex;
104 	int histmax;
105 	int repeat = 0;
106 	int sameline;
107 	int64_t total_bytes;
108 	time_t base_t = time(NULL);
109 	struct timeval bwtv;
110 	uint64_t bwcount;
111 	uint64_t estbytes;
112 
113 	if (ac == 0 || ac > 2)
114 		mirror_usage(1);
115 	filesystem = av[0];
116 	hammer_check_restrict(filesystem);
117 
118 	pickup.signature = 0;
119 	pickup.type = 0;
120 	histogram = 0;
121 	histindex = 0;
122 	histmax = 0;
123 	histogram_ary = NULL;
124 	sameline = 0;
125 
126 again:
127 	bzero(&mirror, sizeof(mirror));
128 	hammer_key_beg_init(&mirror.key_beg);
129 	hammer_key_end_init(&mirror.key_end);
130 
131 	fd = getpfs(&pfs, filesystem);
132 
133 	if (streaming >= 0)
134 		score_printf(LINE1, "Running");
135 
136 	if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) {
137 		fprintf(stderr, "%cRunning  \b\b", (sameline ? '\r' : '\n'));
138 		fflush(stderr);
139 		sameline = 1;
140 	}
141 	sameline = 1;
142 	total_bytes = 0;
143 	gettimeofday(&bwtv, NULL);
144 	bwcount = 0;
145 
146 	/*
147 	 * Send initial header for the purpose of determining the
148 	 * shared-uuid.
149 	 */
150 	generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
151 	write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
152 		      &mrec_tmp, sizeof(mrec_tmp.pfs));
153 
154 	/*
155 	 * In 2-way mode the target will send us a PFS info packet
156 	 * first.  Use the target's current snapshot TID as our default
157 	 * begin TID.
158 	 */
159 	if (TwoWayPipeOpt) {
160 		mirror.tid_beg = 0;
161 		n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
162 					 NULL, &mirror.tid_beg);
163 		if (n < 0) {	/* got TERM record */
164 			relpfs(fd, &pfs);
165 			free(buf);
166 			free(histogram_ary);
167 			return;
168 		}
169 		++mirror.tid_beg;
170 	} else if (streaming && histogram) {
171 		mirror.tid_beg = histogram_ary[histindex].tid + 1;
172 	} else {
173 		mirror.tid_beg = 0;
174 	}
175 
176 	/*
177 	 * Write out the PFS header, tid_beg will be updated if our PFS
178 	 * has a larger begin sync.  tid_end is set to the latest source
179 	 * TID whos flush cycle has completed.
180 	 */
181 	generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
182 	if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
183 		mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
184 	mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
185 	mirror.ubuf = buf;
186 	mirror.size = SERIALBUF_SIZE;
187 	mirror.pfs_id = pfs.pfs_id;
188 	mirror.shared_uuid = pfs.ondisk->shared_uuid;
189 
190 	/*
191 	 * XXX If the histogram is exhausted and the TID delta is large
192 	 *     the stream might have been offline for a while and is
193 	 *     now picking it up again.  Do another histogram.
194 	 */
195 #if 0
196 	if (streaming && histogram && histindex == histend) {
197 		if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM)
198 			histogram = 0;
199 	}
200 #endif
201 
202 	/*
203 	 * Initial bulk startup control, try to do some incremental
204 	 * mirroring in order to allow the stream to be killed and
205 	 * restarted without having to start over.
206 	 */
207 	if (histogram == 0 && BulkOpt == 0) {
208 		if (VerboseOpt && repeat == 0) {
209 			fprintf(stderr, "\n");
210 			sameline = 0;
211 		}
212 		histmax = generate_histogram(fd, filesystem,
213 					     &histogram_ary, &mirror,
214 					     &repeat);
215 		histindex = 0;
216 		histogram = 1;
217 
218 		/*
219 		 * Just stream the histogram, then stop
220 		 */
221 		if (streaming == 0)
222 			streaming = -1;
223 	}
224 
225 	if (streaming && histogram) {
226 		++histindex;
227 		mirror.tid_end = histogram_ary[histindex].tid;
228 		estbytes = histogram_ary[histindex-1].bytes;
229 		mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end;
230 	} else {
231 		estbytes = 0;
232 	}
233 
234 	write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
235 		      &mrec_tmp, sizeof(mrec_tmp.pfs));
236 
237 	/*
238 	 * A cycle file overrides the beginning TID only if we are
239 	 * not operating in two-way or histogram mode.
240 	 */
241 	if (TwoWayPipeOpt == 0 && histogram == 0)
242 		hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
243 
244 	/*
245 	 * An additional argument overrides the beginning TID regardless
246 	 * of what mode we are in.  This is not recommending if operating
247 	 * in two-way mode.
248 	 */
249 	if (ac == 2)
250 		mirror.tid_beg = strtoull(av[1], NULL, 0);
251 
252 	if (streaming == 0 || VerboseOpt >= 2) {
253 		fprintf(stderr,
254 			"Mirror-read: Mirror %016jx to %016jx",
255 			(uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end);
256 		if (histogram)
257 			fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes);
258 		fprintf(stderr, "\n");
259 		fflush(stderr);
260 	}
261 	if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
262 		fprintf(stderr, "Mirror-read: Resuming at object %016jx\n",
263 			(uintmax_t)mirror.key_beg.obj_id);
264 	}
265 
266 	/*
267 	 * Nothing to do if begin equals end.
268 	 */
269 	if (mirror.tid_beg >= mirror.tid_end) {
270 		if (streaming == 0 || VerboseOpt >= 2)
271 			fprintf(stderr, "Mirror-read: No work to do\n");
272 		sleep(DelayOpt);
273 		didwork = 0;
274 		histogram = 0;
275 		goto done;
276 	}
277 	didwork = 1;
278 
279 	/*
280 	 * Write out bulk records
281 	 */
282 	mirror.ubuf = buf;
283 	mirror.size = SERIALBUF_SIZE;
284 
285 	do {
286 		mirror.count = 0;
287 		mirror.pfs_id = pfs.pfs_id;
288 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
289 		if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
290 			score_printf(LINE3, "Mirror-read %s failed: %s",
291 				     filesystem, strerror(errno));
292 			err(1, "Mirror-read %s failed", filesystem);
293 		}
294 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
295 			score_printf(LINE3, "Mirror-read %s fatal error %d",
296 				     filesystem, mirror.head.error);
297 			errx(1, "Mirror-read %s fatal error %d",
298 				filesystem, mirror.head.error);
299 		}
300 		if (mirror.count) {
301 			if (BandwidthOpt) {
302 				n = writebw(1, mirror.ubuf, mirror.count,
303 					    &bwcount, &bwtv);
304 			} else {
305 				n = write(1, mirror.ubuf, mirror.count);
306 			}
307 			if (n != mirror.count) {
308 				score_printf(LINE3,
309 					     "Mirror-read %s failed: "
310 					     "short write",
311 					     filesystem);
312 				errx(1, "Mirror-read %s failed: short write",
313 					filesystem);
314 			}
315 		}
316 		total_bytes += mirror.count;
317 		if (streaming && VerboseOpt) {
318 			fprintf(stderr,
319 				"\rscan obj=%016jx tids=%016jx:%016jx %11jd",
320 				(uintmax_t)mirror.key_cur.obj_id,
321 				(uintmax_t)mirror.tid_beg,
322 				(uintmax_t)mirror.tid_end,
323 				(intmax_t)total_bytes);
324 			fflush(stderr);
325 			sameline = 0;
326 		} else if (streaming) {
327 			score_printf(LINE2,
328 				"obj=%016jx tids=%016jx:%016jx %11jd",
329 				(uintmax_t)mirror.key_cur.obj_id,
330 				(uintmax_t)mirror.tid_beg,
331 				(uintmax_t)mirror.tid_end,
332 				(intmax_t)total_bytes);
333 		}
334 		mirror.key_beg = mirror.key_cur;
335 
336 		/*
337 		 * Deal with time limit option
338 		 */
339 		if (TimeoutOpt &&
340 		    (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
341 			score_printf(LINE3,
342 				"Mirror-read %s interrupted by timer at"
343 				" %016jx",
344 				filesystem,
345 				(uintmax_t)mirror.key_cur.obj_id);
346 			fprintf(stderr,
347 				"Mirror-read %s interrupted by timer at"
348 				" %016jx\n",
349 				filesystem,
350 				(uintmax_t)mirror.key_cur.obj_id);
351 			interrupted = 1;
352 			break;
353 		}
354 	} while (mirror.count != 0);
355 
356 done:
357 	if (streaming && VerboseOpt && sameline == 0) {
358 		fprintf(stderr, "\n");
359 		fflush(stderr);
360 		sameline = 1;
361 	}
362 
363 	/*
364 	 * Write out the termination sync record - only if not interrupted
365 	 */
366 	if (interrupted == 0) {
367 		if (didwork) {
368 			write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
369 				      &mrec_tmp, sizeof(mrec_tmp.sync));
370 		} else {
371 			write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
372 				      &mrec_tmp, sizeof(mrec_tmp.sync));
373 		}
374 	}
375 
376 	/*
377 	 * If the -2 option was given (automatic when doing mirror-copy),
378 	 * a two-way pipe is assumed and we expect a response mrec from
379 	 * the target.
380 	 */
381 	if (TwoWayPipeOpt) {
382 		mrec = read_mrecord(0, &error, &pickup);
383 		if (mrec == NULL ||
384 		    mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
385 		    mrec->head.rec_size != sizeof(mrec->update)) {
386 			errx(1, "mirror_read: Did not get final "
387 				"acknowledgement packet from target");
388 		}
389 		if (interrupted) {
390 			if (CyclePath) {
391 				hammer_set_cycle(&mirror.key_cur,
392 						 mirror.tid_beg);
393 				fprintf(stderr, "Cyclefile %s updated for "
394 					"continuation\n", CyclePath);
395 			}
396 		} else {
397 			sync_tid = mrec->update.tid;
398 			if (CyclePath) {
399 				hammer_key_beg_init(&mirror.key_beg);
400 				hammer_set_cycle(&mirror.key_beg, sync_tid);
401 				fprintf(stderr,
402 					"Cyclefile %s updated to 0x%016jx\n",
403 					CyclePath, (uintmax_t)sync_tid);
404 			}
405 		}
406 		free(mrec);
407 	} else if (CyclePath) {
408 		/* NOTE! mirror.tid_beg cannot be updated */
409 		fprintf(stderr, "Warning: cycle file (-c option) cannot be "
410 				"fully updated unless you use mirror-copy\n");
411 		hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
412 	}
413 	if (streaming && interrupted == 0) {
414 		time_t t1 = time(NULL);
415 		time_t t2;
416 
417 		/*
418 		 * Try to break down large bulk transfers into smaller ones
419 		 * so it can sync the transaction id on the slave.  This
420 		 * way if we get interrupted a restart doesn't have to
421 		 * start from scratch.
422 		 */
423 		if (streaming && histogram) {
424 			if (histindex != histmax) {
425 				if (VerboseOpt && VerboseOpt < 2 &&
426 				    streaming >= 0) {
427 					fprintf(stderr, " (bulk incremental)");
428 				}
429 				relpfs(fd, &pfs);
430 				goto again;
431 			}
432 		}
433 
434 		if (VerboseOpt && streaming >= 0) {
435 			fprintf(stderr, " W");
436 			fflush(stderr);
437 		} else if (streaming >= 0) {
438 			score_printf(LINE1, "Waiting");
439 		}
440 		pfs.ondisk->sync_end_tid = mirror.tid_end;
441 		if (streaming < 0) {
442 			/*
443 			 * Fake streaming mode when using a histogram to
444 			 * break up a mirror-read, do not wait on source.
445 			 */
446 			streaming = 0;
447 		} else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
448 			score_printf(LINE3,
449 				     "Mirror-read %s: cannot stream: %s\n",
450 				     filesystem, strerror(errno));
451 			fprintf(stderr,
452 				"Mirror-read %s: cannot stream: %s\n",
453 				filesystem, strerror(errno));
454 		} else {
455 			t2 = time(NULL) - t1;
456 			if (t2 >= 0 && t2 < DelayOpt) {
457 				if (VerboseOpt) {
458 					fprintf(stderr, "\bD");
459 					fflush(stderr);
460 				}
461 				sleep(DelayOpt - t2);
462 			}
463 			if (VerboseOpt) {
464 				fprintf(stderr, "\b ");
465 				fflush(stderr);
466 			}
467 			relpfs(fd, &pfs);
468 			goto again;
469 		}
470 	}
471 	write_mrecord(1, HAMMER_MREC_TYPE_TERM,
472 		      &mrec_tmp, sizeof(mrec_tmp.sync));
473 	relpfs(fd, &pfs);
474 	free(buf);
475 	free(histogram_ary);
476 	fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
477 }
478 
479 /*
480  * What we are trying to do here is figure out how much data is
481  * going to be sent for the TID range and to break the TID range
482  * down into reasonably-sized slices (from the point of view of
483  * data sent) so a lost connection can restart at a reasonable
484  * place and not all the way back at the beginning.
485  *
486  * An entry's TID serves as the end_tid for the prior entry
487  * So we have to offset the calculation by 1 so that TID falls into
488  * the previous entry when populating entries.
489  *
490  * Because the transaction id space is bursty we need a relatively
491  * large number of buckets (like a million) to do a reasonable job
492  * for things like an initial bulk mirrors on a very large filesystem.
493  */
494 #define HIST_COUNT	(1024 * 1024)
495 
496 static int
497 generate_histogram(int fd, const char *filesystem,
498 		   histogram_t *histogram_ary,
499 		   struct hammer_ioc_mirror_rw *mirror_base,
500 		   int *repeatp)
501 {
502 	struct hammer_ioc_mirror_rw mirror;
503 	union hammer_ioc_mrecord_any *mrec;
504 	hammer_tid_t tid_beg;
505 	hammer_tid_t tid_end;
506 	hammer_tid_t tid;
507 	hammer_tid_t tidx;
508 	uint64_t *tid_bytes;
509 	uint64_t total;
510 	uint64_t accum;
511 	int chunkno;
512 	int i;
513 	int res;
514 	int off;
515 	int len;
516 
517 	mirror = *mirror_base;
518 	tid_beg = mirror.tid_beg;
519 	tid_end = mirror.tid_end;
520 	mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA;
521 
522 	if (*histogram_ary == NULL) {
523 		*histogram_ary = malloc(sizeof(struct histogram) *
524 					(HIST_COUNT + 2));
525 	}
526 	if (tid_beg >= tid_end)
527 		return(0);
528 
529 	/* needs 2 extra */
530 	tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2));
531 	bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2));
532 
533 	if (*repeatp == 0) {
534 		fprintf(stderr, "Prescan to break up bulk transfer");
535 		if (VerboseOpt > 1)
536 			fprintf(stderr, " (%juMB chunks)",
537 				(uintmax_t)(SplitupOpt / (1024 * 1024)));
538 		fprintf(stderr, "\n");
539 	}
540 
541 	/*
542 	 * Note: (tid_beg,tid_end), range is inclusive of both beg & end.
543 	 *
544 	 * Note: Estimates can be off when the mirror is way behind due
545 	 *	 to skips.
546 	 */
547 	total = 0;
548 	accum = 0;
549 	chunkno = 0;
550 	for (;;) {
551 		mirror.count = 0;
552 		if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0)
553 			err(1, "Mirror-read %s failed", filesystem);
554 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR)
555 			errx(1, "Mirror-read %s fatal error %d",
556 				filesystem, mirror.head.error);
557 		for (off = 0;
558 		     off < mirror.count;
559 		     off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) {
560 			mrec = (void *)((char *)mirror.ubuf + off);
561 
562 			/*
563 			 * We only care about general RECs and PASS
564 			 * records.  We ignore SKIPs.
565 			 */
566 			switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) {
567 			case HAMMER_MREC_TYPE_REC:
568 			case HAMMER_MREC_TYPE_PASS:
569 				break;
570 			default:
571 				continue;
572 			}
573 
574 			/*
575 			 * Calculate for two indices, create_tid and
576 			 * delete_tid.  Record data only applies to
577 			 * the create_tid.
578 			 *
579 			 * When tid is exactly on the boundary it really
580 			 * belongs to the previous entry because scans
581 			 * are inclusive of the ending entry.
582 			 */
583 			tid = mrec->rec.leaf.base.delete_tid;
584 			if (tid && tid >= tid_beg && tid <= tid_end) {
585 				len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
586 				if (mrec->head.type ==
587 				    HAMMER_MREC_TYPE_REC) {
588 					len -= HAMMER_HEAD_DOALIGN(
589 						    mrec->rec.leaf.data_len);
590 					assert(len > 0);
591 				}
592 				i = (tid - tid_beg) * HIST_COUNT /
593 				    (tid_end - tid_beg);
594 				tidx = tid_beg + i * (tid_end - tid_beg) /
595 						 HIST_COUNT;
596 				if (tid == tidx && i)
597 					--i;
598 				assert(i >= 0 && i < HIST_COUNT);
599 				tid_bytes[i] += len;
600 				total += len;
601 				accum += len;
602 			}
603 
604 			tid = mrec->rec.leaf.base.create_tid;
605 			if (tid && tid >= tid_beg && tid <= tid_end) {
606 				len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
607 				if (mrec->head.type ==
608 				    HAMMER_MREC_TYPE_REC_NODATA) {
609 					len += HAMMER_HEAD_DOALIGN(
610 						    mrec->rec.leaf.data_len);
611 				}
612 				i = (tid - tid_beg) * HIST_COUNT /
613 				    (tid_end - tid_beg);
614 				tidx = tid_beg + i * (tid_end - tid_beg) /
615 						 HIST_COUNT;
616 				if (tid == tidx && i)
617 					--i;
618 				assert(i >= 0 && i < HIST_COUNT);
619 				tid_bytes[i] += len;
620 				total += len;
621 				accum += len;
622 			}
623 		}
624 		if (*repeatp == 0 && accum > SplitupOpt) {
625 			if (VerboseOpt > 1) {
626 				fprintf(stderr, ".");
627 				fflush(stderr);
628 			}
629 			++chunkno;
630 			score_printf(LINE2, "Prescan chunk %d", chunkno);
631 			accum = 0;
632 		}
633 		if (mirror.count == 0)
634 			break;
635 		mirror.key_beg = mirror.key_cur;
636 	}
637 
638 	/*
639 	 * Reduce to SplitupOpt (default 4GB) chunks.  This code may
640 	 * use up to two additional elements.  Do the array in-place.
641 	 *
642 	 * Inefficient degenerate cases can occur if we do not accumulate
643 	 * at least the requested split amount, so error on the side of
644 	 * going over a bit.
645 	 */
646 	res = 0;
647 	(*histogram_ary)[res].tid = tid_beg;
648 	(*histogram_ary)[res].bytes = tid_bytes[0];
649 	for (i = 1; i < HIST_COUNT; ++i) {
650 		if ((*histogram_ary)[res].bytes >= SplitupOpt) {
651 			++res;
652 			(*histogram_ary)[res].tid = tid_beg +
653 					i * (tid_end - tid_beg) /
654 					HIST_COUNT;
655 			(*histogram_ary)[res].bytes = 0;
656 
657 		}
658 		(*histogram_ary)[res].bytes += tid_bytes[i];
659 	}
660 	++res;
661 	(*histogram_ary)[res].tid = tid_end;
662 	(*histogram_ary)[res].bytes = -1;
663 
664 	if (*repeatp == 0) {
665 		if (VerboseOpt > 1)
666 			fprintf(stderr, "\n");	/* newline after ... */
667 		score_printf(LINE3, "Prescan %d chunks, total %ju MBytes",
668 			res, (uintmax_t)total / (1024 * 1024));
669 		fprintf(stderr, "Prescan %d chunks, total %ju MBytes (",
670 			res, (uintmax_t)total / (1024 * 1024));
671 		for (i = 0; i < res && i < 3; ++i) {
672 			if (i)
673 				fprintf(stderr, ", ");
674 			fprintf(stderr, "%ju",
675 				(uintmax_t)(*histogram_ary)[i].bytes);
676 		}
677 		if (i < res)
678 			fprintf(stderr, ", ...");
679 		fprintf(stderr, ")\n");
680 	}
681 	assert(res <= HIST_COUNT);
682 	*repeatp = 1;
683 
684 	free(tid_bytes);
685 	return(res);
686 }
687 
688 static void
689 create_pfs(const char *filesystem, uuid_t *s_uuid)
690 {
691 	if (ForceYesOpt == 1) {
692 		fprintf(stderr, "PFS slave %s does not exist. "
693 			"Auto create new slave PFS!\n", filesystem);
694 
695 	} else {
696 		fprintf(stderr, "PFS slave %s does not exist.\n"
697 			"Do you want to create a new slave PFS? [y/n] ",
698 			filesystem);
699 		fflush(stderr);
700 		if (getyntty() != 1)
701 			errx(1, "Aborting operation");
702 	}
703 
704 	uint32_t status;
705 	char *shared_uuid = NULL;
706 	uuid_to_string(s_uuid, &shared_uuid, &status);
707 
708 	char *cmd = NULL;
709 	asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2",
710 		 filesystem, shared_uuid);
711 	free(shared_uuid);
712 
713 	if (cmd == NULL)
714 		errx(1, "Failed to alloc memory");
715 	if (system(cmd) != 0)
716 		fprintf(stderr, "Failed to create PFS\n");
717 	free(cmd);
718 }
719 
720 /*
721  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
722  * some additional packet types to negotiate TID ranges and to verify
723  * completion.  The HAMMER VFS does most of the work.
724  *
725  * It is important to note that the mirror.key_{beg,end} range must
726  * match the ranged used by the original.  For now both sides use
727  * range the entire key space.
728  *
729  * It is even more important that the records in the stream conform
730  * to the TID range also supplied in the stream.  The HAMMER VFS will
731  * use the REC, PASS, and SKIP record types to track the portions of
732  * the B-Tree being scanned in order to be able to proactively delete
733  * records on the target within those active areas that are not mentioned
734  * by the source.
735  *
736  * The mirror.key_cur field is used by the VFS to do this tracking.  It
737  * must be initialized to key_beg but then is persistently updated by
738  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
739  * field you will blow up the mirror target, possibly to the point of
740  * deleting everything.  As a safety measure the HAMMER VFS simply marks
741  * the records that the source has destroyed as deleted on the target,
742  * and normal pruning operations will deal with their final disposition
743  * at some later time.
744  */
745 void
746 hammer_cmd_mirror_write(char **av, int ac)
747 {
748 	struct hammer_ioc_mirror_rw mirror;
749 	const char *filesystem;
750 	char *buf = malloc(SERIALBUF_SIZE);
751 	struct hammer_ioc_pseudofs_rw pfs;
752 	struct hammer_ioc_mrecord_head pickup;
753 	struct hammer_ioc_synctid synctid;
754 	union hammer_ioc_mrecord_any mrec_tmp;
755 	hammer_ioc_mrecord_any_t mrec;
756 	struct stat st;
757 	int error;
758 	int fd;
759 	int n;
760 
761 	if (ac != 1)
762 		mirror_usage(1);
763 	filesystem = av[0];
764 	hammer_check_restrict(filesystem);
765 
766 	pickup.signature = 0;
767 	pickup.type = 0;
768 
769 again:
770 	bzero(&mirror, sizeof(mirror));
771 	hammer_key_beg_init(&mirror.key_beg);
772 	hammer_key_end_init(&mirror.key_end);
773 	mirror.key_end = mirror.key_beg;
774 
775 	/*
776 	 * Read initial packet
777 	 */
778 	mrec = read_mrecord(0, &error, &pickup);
779 	if (mrec == NULL) {
780 		if (error == 0)
781 			errx(1, "validate_mrec_header: short read");
782 		exit(1);
783 	}
784 	/*
785 	 * Validate packet
786 	 */
787 	if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
788 		free(buf);
789 		return;
790 	}
791 	if (mrec->head.type != HAMMER_MREC_TYPE_PFSD)
792 		errx(1, "validate_mrec_header: did not get expected "
793 			"PFSD record type");
794 	if (mrec->head.rec_size != sizeof(mrec->pfs))
795 		errx(1, "validate_mrec_header: unexpected payload size");
796 
797 	/*
798 	 * Create slave PFS if it doesn't yet exist
799 	 */
800 	if (lstat(filesystem, &st) != 0)
801 		create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid);
802 	free(mrec);
803 	mrec = NULL;
804 
805 	fd = getpfs(&pfs, filesystem);
806 
807 	/*
808 	 * In two-way mode the target writes out a PFS packet first.
809 	 * The source uses our tid_end as its tid_beg by default,
810 	 * picking up where it left off.
811 	 */
812 	mirror.tid_beg = 0;
813 	if (TwoWayPipeOpt) {
814 		generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
815 		if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
816 			mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
817 		mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
818 		write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
819 			      &mrec_tmp, sizeof(mrec_tmp.pfs));
820 	}
821 
822 	/*
823 	 * Read and process the PFS header.  The source informs us of
824 	 * the TID range the stream represents.
825 	 */
826 	n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
827 				 &mirror.tid_beg, &mirror.tid_end);
828 	if (n < 0) {	/* got TERM record */
829 		relpfs(fd, &pfs);
830 		free(buf);
831 		return;
832 	}
833 
834 	mirror.ubuf = buf;
835 	mirror.size = SERIALBUF_SIZE;
836 
837 	/*
838 	 * Read and process bulk records (REC, PASS, and SKIP types).
839 	 *
840 	 * On your life, do NOT mess with mirror.key_cur or your mirror
841 	 * target may become history.
842 	 */
843 	for (;;) {
844 		mirror.count = 0;
845 		mirror.pfs_id = pfs.pfs_id;
846 		mirror.shared_uuid = pfs.ondisk->shared_uuid;
847 		mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
848 		if (mirror.size <= 0)
849 			break;
850 		if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0)
851 			err(1, "Mirror-write %s failed", filesystem);
852 		if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR)
853 			errx(1, "Mirror-write %s fatal error %d",
854 				filesystem, mirror.head.error);
855 #if 0
856 		if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
857 			errx(1, "Mirror-write %s interrupted by timer at"
858 				" %016llx",
859 				filesystem,
860 				mirror.key_cur.obj_id);
861 		}
862 #endif
863 	}
864 
865 	/*
866 	 * Read and process the termination sync record.
867 	 */
868 	mrec = read_mrecord(0, &error, &pickup);
869 
870 	if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
871 		fprintf(stderr, "Mirror-write: received termination request\n");
872 		relpfs(fd, &pfs);
873 		free(mrec);
874 		free(buf);
875 		return;
876 	}
877 
878 	if (mrec == NULL ||
879 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
880 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
881 	    mrec->head.rec_size != sizeof(mrec->sync)) {
882 		errx(1, "Mirror-write %s: Did not get termination "
883 			"sync record, or rec_size is wrong rt=%d",
884 			filesystem, (mrec ? (int)mrec->head.type : -1));
885 	}
886 
887 	/*
888 	 * Update the PFS info on the target so the user has visibility
889 	 * into the new snapshot, and sync the target filesystem.
890 	 */
891 	if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
892 		update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
893 
894 		bzero(&synctid, sizeof(synctid));
895 		synctid.op = HAMMER_SYNCTID_SYNC2;
896 		ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
897 
898 		if (VerboseOpt >= 2) {
899 			fprintf(stderr, "Mirror-write %s: succeeded\n",
900 				filesystem);
901 		}
902 	}
903 
904 	free(mrec);
905 	mrec = NULL;
906 
907 	/*
908 	 * Report back to the originator.
909 	 */
910 	if (TwoWayPipeOpt) {
911 		mrec_tmp.update.tid = mirror.tid_end;
912 		write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
913 			      &mrec_tmp, sizeof(mrec_tmp.update));
914 	} else {
915 		printf("Source can update synctid to 0x%016jx\n",
916 		       (uintmax_t)mirror.tid_end);
917 	}
918 	relpfs(fd, &pfs);
919 	goto again;
920 }
921 
922 void
923 hammer_cmd_mirror_dump(char **av, int ac)
924 {
925 	char *buf = malloc(SERIALBUF_SIZE);
926 	struct hammer_ioc_mrecord_head pickup;
927 	hammer_ioc_mrecord_any_t mrec;
928 	int error;
929 	int size;
930 	int offset;
931 	int bytes;
932 	int header_only = 0;
933 
934 	if (ac == 1 && strcmp(*av, "header") == 0)
935 		header_only = 1;
936 	else if (ac != 0)
937 		mirror_usage(1);
938 
939 	/*
940 	 * Read and process the PFS header
941 	 */
942 	pickup.signature = 0;
943 	pickup.type = 0;
944 
945 	mrec = read_mrecord(0, &error, &pickup);
946 
947 	/*
948 	 * Dump the PFS header. mirror-dump takes its input from the output
949 	 * of a mirror-read so getpfs() can't be used to get a fd to be passed
950 	 * to dump_pfsd().
951 	 */
952 	if (header_only && mrec != NULL) {
953 		dump_pfsd(&mrec->pfs.pfsd, -1);
954 		free(mrec);
955 		free(buf);
956 		return;
957 	}
958 	free(mrec);
959 
960 again:
961 	/*
962 	 * Read and process bulk records
963 	 */
964 	for (;;) {
965 		size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
966 		if (size <= 0)
967 			break;
968 		offset = 0;
969 		while (offset < size) {
970 			mrec = (void *)((char *)buf + offset);
971 			bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
972 			if (offset + bytes > size)
973 				errx(1, "Misaligned record");
974 
975 			switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) {
976 			case HAMMER_MREC_TYPE_REC_BADCRC:
977 			case HAMMER_MREC_TYPE_REC:
978 				printf("Record lo=%08x obj=%016jx key=%016jx "
979 				       "rt=%02x ot=%02x",
980 				        mrec->rec.leaf.base.localization,
981 					(uintmax_t)mrec->rec.leaf.base.obj_id,
982 					(uintmax_t)mrec->rec.leaf.base.key,
983 					mrec->rec.leaf.base.rec_type,
984 					mrec->rec.leaf.base.obj_type);
985 				if (mrec->head.type ==
986 				    HAMMER_MREC_TYPE_REC_BADCRC) {
987 					printf(" (BAD CRC)");
988 				}
989 				printf("\n");
990 				printf("       tids %016jx:%016jx data=%d\n",
991 				    (uintmax_t)mrec->rec.leaf.base.create_tid,
992 				    (uintmax_t)mrec->rec.leaf.base.delete_tid,
993 				    mrec->rec.leaf.data_len);
994 				break;
995 			case HAMMER_MREC_TYPE_PASS:
996 				printf("Pass   lo=%08x obj=%016jx key=%016jx "
997 				       "rt=%02x ot=%02x\n",
998 				        mrec->rec.leaf.base.localization,
999 					(uintmax_t)mrec->rec.leaf.base.obj_id,
1000 					(uintmax_t)mrec->rec.leaf.base.key,
1001 					mrec->rec.leaf.base.rec_type,
1002 					mrec->rec.leaf.base.obj_type);
1003 				printf("       tids %016jx:%016jx data=%d\n",
1004 				    (uintmax_t)mrec->rec.leaf.base.create_tid,
1005 				    (uintmax_t)mrec->rec.leaf.base.delete_tid,
1006 					mrec->rec.leaf.data_len);
1007 				break;
1008 			case HAMMER_MREC_TYPE_SKIP:
1009 				printf("Skip   lo=%08x obj=%016jx key=%016jx rt=%02x to\n"
1010 				       "       lo=%08x obj=%016jx key=%016jx rt=%02x\n",
1011 				       mrec->skip.skip_beg.localization,
1012 				       (uintmax_t)mrec->skip.skip_beg.obj_id,
1013 				       (uintmax_t)mrec->skip.skip_beg.key,
1014 				       mrec->skip.skip_beg.rec_type,
1015 				       mrec->skip.skip_end.localization,
1016 				       (uintmax_t)mrec->skip.skip_end.obj_id,
1017 				       (uintmax_t)mrec->skip.skip_end.key,
1018 				       mrec->skip.skip_end.rec_type);
1019 			default:
1020 				break;
1021 			}
1022 			offset += bytes;
1023 		}
1024 	}
1025 
1026 	/*
1027 	 * Read and process the termination sync record.
1028 	 */
1029 	mrec = read_mrecord(0, &error, &pickup);
1030 	if (mrec == NULL ||
1031 	    (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
1032 	     mrec->head.type != HAMMER_MREC_TYPE_IDLE)) {
1033 		fprintf(stderr, "Mirror-dump: Did not get termination "
1034 				"sync record\n");
1035 	}
1036 	free(mrec);
1037 
1038 	/*
1039 	 * Continue with more batches until EOF.
1040 	 */
1041 	mrec = read_mrecord(0, &error, &pickup);
1042 	if (mrec) {
1043 		free(mrec);
1044 		goto again;
1045 	}
1046 	free(buf);
1047 }
1048 
1049 void
1050 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
1051 {
1052 	pid_t pid1;
1053 	pid_t pid2;
1054 	int fds[2];
1055 	const char *xav[32];
1056 	char tbuf[16];
1057 	char *sh, *user, *host, *rfs;
1058 	int xac;
1059 
1060 	if (ac != 2)
1061 		mirror_usage(1);
1062 
1063 	TwoWayPipeOpt = 1;
1064 	signal(SIGPIPE, SIG_IGN);
1065 
1066 again:
1067 	if (pipe(fds) < 0)
1068 		err(1, "pipe");
1069 
1070 	/*
1071 	 * Source
1072 	 */
1073 	if ((pid1 = fork()) == 0) {
1074 		signal(SIGPIPE, SIG_DFL);
1075 		dup2(fds[0], 0);
1076 		dup2(fds[0], 1);
1077 		close(fds[0]);
1078 		close(fds[1]);
1079 		if ((rfs = strchr(av[0], ':')) != NULL) {
1080 			xac = 0;
1081 
1082 			if((sh = getenv("HAMMER_RSH")) == NULL)
1083 				xav[xac++] = "ssh";
1084 			else
1085 				xav[xac++] = sh;
1086 
1087 			if (CompressOpt)
1088 				xav[xac++] = "-C";
1089 
1090 			if ((host = strchr(av[0], '@')) != NULL) {
1091 				user = strndup( av[0], (host++ - av[0]));
1092 				host = strndup( host, (rfs++ - host));
1093 				xav[xac++] = "-l";
1094 				xav[xac++] = user;
1095 				xav[xac++] = host;
1096 			} else {
1097 				host = strndup( av[0], (rfs++ - av[0]));
1098 				user = NULL;
1099 				xav[xac++] = host;
1100 			}
1101 
1102 
1103 			if (SshPort) {
1104 				xav[xac++] = "-p";
1105 				xav[xac++] = SshPort;
1106 			}
1107 
1108 			xav[xac++] = "hammer";
1109 
1110 			switch(VerboseOpt) {
1111 			case 0:
1112 				break;
1113 			case 1:
1114 				xav[xac++] = "-v";
1115 				break;
1116 			case 2:
1117 				xav[xac++] = "-vv";
1118 				break;
1119 			default:
1120 				xav[xac++] = "-vvv";
1121 				break;
1122 			}
1123 			if (ForceYesOpt)
1124 				xav[xac++] = "-y";
1125 			xav[xac++] = "-2";
1126 			if (TimeoutOpt) {
1127 				snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
1128 				xav[xac++] = "-t";
1129 				xav[xac++] = tbuf;
1130 			}
1131 			if (SplitupOptStr) {
1132 				xav[xac++] = "-S";
1133 				xav[xac++] = SplitupOptStr;
1134 			}
1135 			if (streaming)
1136 				xav[xac++] = "mirror-read-stream";
1137 			else
1138 				xav[xac++] = "mirror-read";
1139 			xav[xac++] = rfs;
1140 			xav[xac++] = NULL;
1141 			execvp(*xav, (void *)xav);
1142 		} else {
1143 			hammer_cmd_mirror_read(av, 1, streaming);
1144 			fflush(stdout);
1145 			fflush(stderr);
1146 		}
1147 		_exit(1);
1148 	}
1149 
1150 	/*
1151 	 * Target
1152 	 */
1153 	if ((pid2 = fork()) == 0) {
1154 		signal(SIGPIPE, SIG_DFL);
1155 		dup2(fds[1], 0);
1156 		dup2(fds[1], 1);
1157 		close(fds[0]);
1158 		close(fds[1]);
1159 		if ((rfs = strchr(av[1], ':')) != NULL) {
1160 			xac = 0;
1161 
1162 			if((sh = getenv("HAMMER_RSH")) == NULL)
1163 				xav[xac++] = "ssh";
1164 			else
1165 				xav[xac++] = sh;
1166 
1167 			if (CompressOpt)
1168 				xav[xac++] = "-C";
1169 
1170 			if ((host = strchr(av[1], '@')) != NULL) {
1171 				user = strndup( av[1], (host++ - av[1]));
1172 				host = strndup( host, (rfs++ - host));
1173 				xav[xac++] = "-l";
1174 				xav[xac++] = user;
1175 				xav[xac++] = host;
1176 			} else {
1177 				host = strndup( av[1], (rfs++ - av[1]));
1178 				user = NULL;
1179 				xav[xac++] = host;
1180 			}
1181 
1182 			if (SshPort) {
1183 				xav[xac++] = "-p";
1184 				xav[xac++] = SshPort;
1185 			}
1186 
1187 			xav[xac++] = "hammer";
1188 
1189 			switch(VerboseOpt) {
1190 			case 0:
1191 				break;
1192 			case 1:
1193 				xav[xac++] = "-v";
1194 				break;
1195 			case 2:
1196 				xav[xac++] = "-vv";
1197 				break;
1198 			default:
1199 				xav[xac++] = "-vvv";
1200 				break;
1201 			}
1202 			if (ForceYesOpt)
1203 				xav[xac++] = "-y";
1204 			xav[xac++] = "-2";
1205 			xav[xac++] = "mirror-write";
1206 			xav[xac++] = rfs;
1207 			xav[xac++] = NULL;
1208 			execvp(*xav, (void *)xav);
1209 		} else {
1210 			hammer_cmd_mirror_write(av + 1, 1);
1211 			fflush(stdout);
1212 			fflush(stderr);
1213 		}
1214 		_exit(1);
1215 	}
1216 	close(fds[0]);
1217 	close(fds[1]);
1218 
1219 	while (waitpid(pid1, NULL, 0) <= 0)
1220 		;
1221 	while (waitpid(pid2, NULL, 0) <= 0)
1222 		;
1223 
1224 	/*
1225 	 * If the link is lost restart
1226 	 */
1227 	if (streaming) {
1228 		if (VerboseOpt) {
1229 			fprintf(stderr, "\nLost Link\n");
1230 			fflush(stderr);
1231 		}
1232 		sleep(15 + DelayOpt);
1233 		goto again;
1234 	}
1235 
1236 }
1237 
1238 /*
1239  * Read and return multiple mrecords
1240  */
1241 static int
1242 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
1243 {
1244 	hammer_ioc_mrecord_any_t mrec;
1245 	u_int count;
1246 	size_t n;
1247 	size_t i;
1248 	size_t bytes;
1249 	int type;
1250 
1251 	count = 0;
1252 	while (size - count >= HAMMER_MREC_HEADSIZE) {
1253 		/*
1254 		 * Cached the record header in case we run out of buffer
1255 		 * space.
1256 		 */
1257 		fflush(stdout);
1258 		if (pickup->signature == 0) {
1259 			for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1260 				i = read(fd, (char *)pickup + n,
1261 					 HAMMER_MREC_HEADSIZE - n);
1262 				if (i <= 0)
1263 					break;
1264 			}
1265 			if (n == 0)
1266 				break;
1267 			if (n != HAMMER_MREC_HEADSIZE)
1268 				errx(1, "read_mrecords: short read on pipe");
1269 			if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE)
1270 				errx(1, "read_mrecords: malformed record on pipe, "
1271 					"bad signature");
1272 		}
1273 		if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
1274 		    pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE)
1275 			errx(1, "read_mrecords: malformed record on pipe, "
1276 				"illegal rec_size");
1277 
1278 		/*
1279 		 * Stop if we have insufficient space for the record and data.
1280 		 */
1281 		bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
1282 		if (size - count < bytes)
1283 			break;
1284 
1285 		/*
1286 		 * Stop if the record type is not a REC, SKIP, or PASS,
1287 		 * which are the only types the ioctl supports.  Other types
1288 		 * are used only by the userland protocol.
1289 		 *
1290 		 * Ignore all flags.
1291 		 */
1292 		type = pickup->type & HAMMER_MRECF_TYPE_LOMASK;
1293 		if (type != HAMMER_MREC_TYPE_PFSD &&
1294 		    type != HAMMER_MREC_TYPE_REC &&
1295 		    type != HAMMER_MREC_TYPE_SKIP &&
1296 		    type != HAMMER_MREC_TYPE_PASS) {
1297 			break;
1298 		}
1299 
1300 		/*
1301 		 * Read the remainder and clear the pickup signature.
1302 		 */
1303 		for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
1304 			i = read(fd, buf + count + n, bytes - n);
1305 			if (i <= 0)
1306 				break;
1307 		}
1308 		if (n != bytes)
1309 			errx(1, "read_mrecords: short read on pipe");
1310 
1311 		bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
1312 		pickup->signature = 0;
1313 		pickup->type = 0;
1314 		mrec = (void *)(buf + count);
1315 
1316 		/*
1317 		 * Validate the completed record
1318 		 */
1319 		if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size))
1320 			errx(1, "read_mrecords: malformed record on pipe, bad crc");
1321 
1322 		/*
1323 		 * If its a B-Tree record validate the data crc.
1324 		 *
1325 		 * NOTE: If the VFS passes us an explicitly errorde mrec
1326 		 *	 we just pass it through.
1327 		 */
1328 		type = mrec->head.type & HAMMER_MRECF_TYPE_MASK;
1329 
1330 		if (type == HAMMER_MREC_TYPE_REC) {
1331 			if (mrec->head.rec_size <
1332 			    sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
1333 				errx(1, "read_mrecords: malformed record on "
1334 					"pipe, illegal element data_len");
1335 			}
1336 			if (mrec->rec.leaf.data_len &&
1337 			    mrec->rec.leaf.data_offset &&
1338 			    hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) {
1339 				fprintf(stderr,
1340 					"read_mrecords: data_crc did not "
1341 					"match data! obj=%016jx key=%016jx\n",
1342 					(uintmax_t)mrec->rec.leaf.base.obj_id,
1343 					(uintmax_t)mrec->rec.leaf.base.key);
1344 				fprintf(stderr,
1345 					"continuing, but there are problems\n");
1346 			}
1347 		}
1348 		count += bytes;
1349 	}
1350 	return(count);
1351 }
1352 
1353 /*
1354  * Read and return a single mrecord.
1355  */
1356 static
1357 hammer_ioc_mrecord_any_t
1358 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
1359 {
1360 	hammer_ioc_mrecord_any_t mrec;
1361 	struct hammer_ioc_mrecord_head mrechd;
1362 	size_t bytes;
1363 	size_t n;
1364 	size_t i;
1365 
1366 	if (pickup && pickup->type != 0) {
1367 		mrechd = *pickup;
1368 		pickup->signature = 0;
1369 		pickup->type = 0;
1370 		n = HAMMER_MREC_HEADSIZE;
1371 	} else {
1372 		/*
1373 		 * Read in the PFSD header from the sender.
1374 		 */
1375 		for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1376 			i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
1377 			if (i <= 0)
1378 				break;
1379 		}
1380 		if (n == 0) {
1381 			*errorp = 0;	/* EOF */
1382 			return(NULL);
1383 		}
1384 		if (n != HAMMER_MREC_HEADSIZE) {
1385 			fprintf(stderr, "short read of mrecord header\n");
1386 			*errorp = EPIPE;
1387 			return(NULL);
1388 		}
1389 	}
1390 	if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
1391 		fprintf(stderr, "read_mrecord: bad signature\n");
1392 		*errorp = EINVAL;
1393 		return(NULL);
1394 	}
1395 	bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
1396 	assert(bytes >= sizeof(mrechd));
1397 	mrec = malloc(bytes);
1398 	mrec->head = mrechd;
1399 
1400 	while (n < bytes) {
1401 		i = read(fdin, (char *)mrec + n, bytes - n);
1402 		if (i <= 0)
1403 			break;
1404 		n += i;
1405 	}
1406 	if (n != bytes) {
1407 		fprintf(stderr, "read_mrecord: short read on payload\n");
1408 		*errorp = EPIPE;
1409 		return(NULL);
1410 	}
1411 	if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) {
1412 		fprintf(stderr, "read_mrecord: bad CRC\n");
1413 		*errorp = EINVAL;
1414 		return(NULL);
1415 	}
1416 	*errorp = 0;
1417 	return(mrec);
1418 }
1419 
1420 static
1421 void
1422 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec,
1423 	      int bytes)
1424 {
1425 	char zbuf[HAMMER_HEAD_ALIGN];
1426 	int pad;
1427 
1428 	pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
1429 
1430 	assert(bytes >= (int)sizeof(mrec->head));
1431 	bzero(&mrec->head, sizeof(mrec->head));
1432 	mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
1433 	mrec->head.type = type;
1434 	mrec->head.rec_size = bytes;
1435 	hammer_crc_set_mrec_head(&mrec->head, bytes);
1436 	if (write(fdout, mrec, bytes) != bytes)
1437 		err(1, "write_mrecord");
1438 	if (pad) {
1439 		bzero(zbuf, pad);
1440 		if (write(fdout, zbuf, pad) != pad)
1441 			err(1, "write_mrecord");
1442 	}
1443 }
1444 
1445 /*
1446  * Generate a mirroring header with the pfs information of the
1447  * originating filesytem.
1448  */
1449 static void
1450 generate_mrec_header(int fd, int pfs_id,
1451 		     union hammer_ioc_mrecord_any *mrec_tmp)
1452 {
1453 	struct hammer_ioc_pseudofs_rw pfs;
1454 
1455 	bzero(mrec_tmp, sizeof(*mrec_tmp));
1456 	clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id);
1457 
1458 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0)
1459 		err(1, "Mirror-read: not a HAMMER fs/pseudofs!");
1460 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION)
1461 		errx(1, "Mirror-read: HAMMER PFS version mismatch!");
1462 	mrec_tmp->pfs.version = pfs.version;
1463 }
1464 
1465 /*
1466  * Validate the pfs information from the originating filesystem
1467  * against the target filesystem.  shared_uuid must match.
1468  *
1469  * return -1 if we got a TERM record
1470  */
1471 static int
1472 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
1473 		     struct hammer_ioc_mrecord_head *pickup,
1474 		     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
1475 {
1476 	struct hammer_ioc_pseudofs_rw pfs;
1477 	struct hammer_pseudofs_data pfsd;
1478 	hammer_ioc_mrecord_any_t mrec;
1479 	int error;
1480 
1481 	/*
1482 	 * Get the PFSD info from the target filesystem.
1483 	 */
1484 	clrpfs(&pfs, &pfsd, pfs_id);
1485 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0)
1486 		err(1, "mirror-write: not a HAMMER fs/pseudofs!");
1487 	if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION)
1488 		errx(1, "mirror-write: HAMMER PFS version mismatch!");
1489 
1490 	mrec = read_mrecord(fdin, &error, pickup);
1491 	if (mrec == NULL) {
1492 		if (error == 0)
1493 			errx(1, "validate_mrec_header: short read");
1494 		exit(1);
1495 	}
1496 	if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
1497 		free(mrec);
1498 		return(-1);
1499 	}
1500 
1501 	if (mrec->head.type != HAMMER_MREC_TYPE_PFSD)
1502 		errx(1, "validate_mrec_header: did not get expected "
1503 			"PFSD record type");
1504 	if (mrec->head.rec_size != sizeof(mrec->pfs))
1505 		errx(1, "validate_mrec_header: unexpected payload size");
1506 	if (mrec->pfs.version != pfs.version)
1507 		errx(1, "validate_mrec_header: Version mismatch");
1508 
1509 	/*
1510 	 * Whew.  Ok, is the read PFS info compatible with the target?
1511 	 */
1512 	if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1513 		sizeof(pfsd.shared_uuid)) != 0)
1514 		errx(1, "mirror-write: source and target have "
1515 			"different shared-uuid's!");
1516 	if (is_target && hammer_is_pfs_master(&pfsd))
1517 		errx(1, "mirror-write: target must be in slave mode");
1518 	if (tid_begp)
1519 		*tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1520 	if (tid_endp)
1521 		*tid_endp = mrec->pfs.pfsd.sync_end_tid;
1522 	free(mrec);
1523 	return(0);
1524 }
1525 
1526 static void
1527 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1528 {
1529 	struct hammer_ioc_pseudofs_rw pfs;
1530 	struct hammer_pseudofs_data pfsd;
1531 
1532 	clrpfs(&pfs, &pfsd, pfs_id);
1533 	if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0)
1534 		err(1, "update_pfs_snapshot (read)");
1535 
1536 	if (pfsd.sync_end_tid != snapshot_tid) {
1537 		pfsd.sync_end_tid = snapshot_tid;
1538 		if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0)
1539 			err(1, "update_pfs_snapshot (rewrite)");
1540 		if (VerboseOpt >= 2) {
1541 			fprintf(stderr,
1542 				"Mirror-write: Completed, updated snapshot "
1543 				"to %016jx\n",
1544 				(uintmax_t)snapshot_tid);
1545 			fflush(stderr);
1546 		}
1547 	}
1548 }
1549 
1550 /*
1551  * Bandwidth-limited write in chunks
1552  */
1553 static
1554 ssize_t
1555 writebw(int fd, const void *buf, size_t nbytes,
1556 	uint64_t *bwcount, struct timeval *tv1)
1557 {
1558 	struct timeval tv2;
1559 	size_t n;
1560 	ssize_t r;
1561 	ssize_t a;
1562 	int usec;
1563 
1564 	a = 0;
1565 	r = 0;
1566 	while (nbytes) {
1567 		if (*bwcount + nbytes > BandwidthOpt)
1568 			n = BandwidthOpt - *bwcount;
1569 		else
1570 			n = nbytes;
1571 		if (n)
1572 			r = write(fd, buf, n);
1573 		if (r >= 0) {
1574 			a += r;
1575 			nbytes -= r;
1576 			buf = (const char *)buf + r;
1577 		}
1578 		if ((size_t)r != n)
1579 			break;
1580 		*bwcount += n;
1581 		if (*bwcount >= BandwidthOpt) {
1582 			gettimeofday(&tv2, NULL);
1583 			usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1584 				(int)(tv2.tv_usec - tv1->tv_usec);
1585 			if (usec >= 0 && usec < 1000000)
1586 				usleep(1000000 - usec);
1587 			gettimeofday(tv1, NULL);
1588 			*bwcount -= BandwidthOpt;
1589 		}
1590 	}
1591 	return(a ? a : r);
1592 }
1593 
1594 /*
1595  * Get a yes or no answer from the terminal.  The program may be run as
1596  * part of a two-way pipe so we cannot use stdin for this operation.
1597  */
1598 static int
1599 getyntty(void)
1600 {
1601 	char buf[256];
1602 	FILE *fp;
1603 	int result;
1604 
1605 	fp = fopen("/dev/tty", "r");
1606 	if (fp == NULL) {
1607 		fprintf(stderr, "No terminal for response\n");
1608 		return(-1);
1609 	}
1610 	result = -1;
1611 	while (fgets(buf, sizeof(buf), fp) != NULL) {
1612 		if (buf[0] == 'y' || buf[0] == 'Y') {
1613 			result = 1;
1614 			break;
1615 		}
1616 		if (buf[0] == 'n' || buf[0] == 'N') {
1617 			result = 0;
1618 			break;
1619 		}
1620 		fprintf(stderr, "Response not understood\n");
1621 		break;
1622 	}
1623 	fclose(fp);
1624 	return(result);
1625 }
1626 
1627 static void
1628 score_printf(size_t i, size_t w, const char *ctl, ...)
1629 {
1630 	va_list va;
1631 	size_t n;
1632 	static size_t SSize;
1633 	static int SFd = -1;
1634 	static char ScoreBuf[1024];
1635 
1636 	if (ScoreBoardFile == NULL)
1637 		return;
1638 	assert(i + w < sizeof(ScoreBuf));
1639 	if (SFd < 0) {
1640 		SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644);
1641 		if (SFd < 0)
1642 			return;
1643 		SSize = 0;
1644 	}
1645 	for (n = 0; n < i; ++n) {
1646 		if (ScoreBuf[n] == 0)
1647 			ScoreBuf[n] = ' ';
1648 	}
1649 	va_start(va, ctl);
1650 	vsnprintf(ScoreBuf + i, w - 1, ctl, va);
1651 	va_end(va);
1652 	n = strlen(ScoreBuf + i);
1653 	while (n < w - 1) {
1654 		ScoreBuf[i + n] = ' ';
1655 		++n;
1656 	}
1657 	ScoreBuf[i + n] = '\n';
1658 	if (SSize < i + w)
1659 		SSize = i + w;
1660 	pwrite(SFd, ScoreBuf, SSize, 0);
1661 }
1662 
1663 static void
1664 hammer_check_restrict(const char *filesystem)
1665 {
1666 	size_t rlen;
1667 	int atslash;
1668 
1669 	if (RestrictTarget == NULL)
1670 		return;
1671 	rlen = strlen(RestrictTarget);
1672 	if (strncmp(filesystem, RestrictTarget, rlen) != 0)
1673 		errx(1, "hammer-remote: restricted target");
1674 
1675 	atslash = 1;
1676 	while (filesystem[rlen]) {
1677 		if (atslash &&
1678 		    filesystem[rlen] == '.' &&
1679 		    filesystem[rlen+1] == '.') {
1680 			errx(1, "hammer-remote: '..' not allowed");
1681 		}
1682 		if (filesystem[rlen] == '/')
1683 			atslash = 1;
1684 		else
1685 			atslash = 0;
1686 		++rlen;
1687 	}
1688 }
1689 
1690 static void
1691 mirror_usage(int code)
1692 {
1693 	fprintf(stderr,
1694 		"hammer mirror-read <filesystem> [begin-tid]\n"
1695 		"hammer mirror-read-stream <filesystem> [begin-tid]\n"
1696 		"hammer mirror-write <filesystem>\n"
1697 		"hammer mirror-dump [header]\n"
1698 		"hammer mirror-copy [[user@]host:]<filesystem>"
1699 				  " [[user@]host:]<filesystem>\n"
1700 		"hammer mirror-stream [[user@]host:]<filesystem>"
1701 				    " [[user@]host:]<filesystem>\n"
1702 	);
1703 	exit(code);
1704 }
1705 
1706