1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define SERIALBUF_SIZE (512 * 1024) 38 39 typedef struct histogram { 40 hammer_tid_t tid; 41 u_int64_t bytes; 42 } *histogram_t; 43 44 static int read_mrecords(int fd, char *buf, u_int size, 45 hammer_ioc_mrecord_head_t pickup); 46 static int generate_histogram(int fd, const char *filesystem, 47 histogram_t *histogram_ary, 48 struct hammer_ioc_mirror_rw *mirror_base, 49 int *repeatp); 50 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 51 hammer_ioc_mrecord_head_t pickup); 52 static void write_mrecord(int fdout, u_int32_t type, 53 hammer_ioc_mrecord_any_t mrec, int bytes); 54 static void generate_mrec_header(int fd, int pfs_id, 55 union hammer_ioc_mrecord_any *mrec_tmp); 56 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 57 struct hammer_ioc_mrecord_head *pickup, 58 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 59 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 60 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 61 u_int64_t *bwcount, struct timeval *tv1); 62 static int getyn(void); 63 static void mirror_usage(int code); 64 65 /* 66 * Generate a mirroring data stream from the specific source over the 67 * entire key range, but restricted to the specified transaction range. 68 * 69 * The HAMMER VFS does most of the work, we add a few new mrecord 70 * types to negotiate the TID ranges and verify that the entire 71 * stream made it to the destination. 72 * 73 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 74 * set up a fake value of -1 when running the histogram for mirror-read. 75 */ 76 void 77 hammer_cmd_mirror_read(char **av, int ac, int streaming) 78 { 79 struct hammer_ioc_mirror_rw mirror; 80 struct hammer_ioc_pseudofs_rw pfs; 81 union hammer_ioc_mrecord_any mrec_tmp; 82 struct hammer_ioc_mrecord_head pickup; 83 hammer_ioc_mrecord_any_t mrec; 84 hammer_tid_t sync_tid; 85 histogram_t histogram_ary; 86 const char *filesystem; 87 char *buf = malloc(SERIALBUF_SIZE); 88 int interrupted = 0; 89 int error; 90 int fd; 91 int n; 92 int didwork; 93 int histogram; 94 int histindex; 95 int histmax; 96 int repeat = 0; 97 int sameline; 98 int64_t total_bytes; 99 time_t base_t = time(NULL); 100 struct timeval bwtv; 101 u_int64_t bwcount; 102 u_int64_t estbytes; 103 104 if (ac == 0 || ac > 2) 105 mirror_usage(1); 106 filesystem = av[0]; 107 108 pickup.signature = 0; 109 pickup.type = 0; 110 histogram = 0; 111 histindex = 0; 112 histmax = 0; 113 histogram_ary = NULL; 114 sameline = 0; 115 116 again: 117 bzero(&mirror, sizeof(mirror)); 118 hammer_key_beg_init(&mirror.key_beg); 119 hammer_key_end_init(&mirror.key_end); 120 121 fd = getpfs(&pfs, filesystem); 122 123 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 124 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 125 fflush(stderr); 126 sameline = 1; 127 } 128 sameline = 1; 129 total_bytes = 0; 130 gettimeofday(&bwtv, NULL); 131 bwcount = 0; 132 133 /* 134 * Send initial header for the purpose of determining the 135 * shared-uuid. 136 */ 137 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 138 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 139 &mrec_tmp, sizeof(mrec_tmp.pfs)); 140 141 /* 142 * In 2-way mode the target will send us a PFS info packet 143 * first. Use the target's current snapshot TID as our default 144 * begin TID. 145 */ 146 if (TwoWayPipeOpt) { 147 mirror.tid_beg = 0; 148 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 149 NULL, &mirror.tid_beg); 150 if (n < 0) { /* got TERM record */ 151 relpfs(fd, &pfs); 152 return; 153 } 154 ++mirror.tid_beg; 155 } else if (streaming && histogram) { 156 mirror.tid_beg = histogram_ary[histindex].tid + 1; 157 } else { 158 mirror.tid_beg = 0; 159 } 160 161 /* 162 * Write out the PFS header, tid_beg will be updated if our PFS 163 * has a larger begin sync. tid_end is set to the latest source 164 * TID whos flush cycle has completed. 165 */ 166 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 167 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 168 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 169 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 170 mirror.ubuf = buf; 171 mirror.size = SERIALBUF_SIZE; 172 mirror.pfs_id = pfs.pfs_id; 173 mirror.shared_uuid = pfs.ondisk->shared_uuid; 174 175 /* 176 * XXX If the histogram is exhausted and the TID delta is large 177 * the stream might have been offline for a while and is 178 * now picking it up again. Do another histogram. 179 */ 180 #if 0 181 if (streaming && histogram && histindex == histend) { 182 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 183 histogram = 0; 184 } 185 #endif 186 187 /* 188 * Initial bulk startup control, try to do some incremental 189 * mirroring in order to allow the stream to be killed and 190 * restarted without having to start over. 191 */ 192 if (histogram == 0 && BulkOpt == 0) { 193 if (VerboseOpt && repeat == 0) { 194 fprintf(stderr, "\n"); 195 sameline = 0; 196 } 197 histmax = generate_histogram(fd, filesystem, 198 &histogram_ary, &mirror, 199 &repeat); 200 histindex = 0; 201 histogram = 1; 202 203 /* 204 * Just stream the histogram, then stop 205 */ 206 if (streaming == 0) 207 streaming = -1; 208 } 209 210 if (streaming && histogram) { 211 ++histindex; 212 mirror.tid_end = histogram_ary[histindex].tid; 213 estbytes = histogram_ary[histindex-1].bytes; 214 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 215 } else { 216 estbytes = 0; 217 } 218 219 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 220 &mrec_tmp, sizeof(mrec_tmp.pfs)); 221 222 /* 223 * A cycle file overrides the beginning TID only if we are 224 * not operating in two-way or histogram mode. 225 */ 226 if (TwoWayPipeOpt == 0 && histogram == 0) { 227 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 228 } 229 230 /* 231 * An additional argument overrides the beginning TID regardless 232 * of what mode we are in. This is not recommending if operating 233 * in two-way mode. 234 */ 235 if (ac == 2) 236 mirror.tid_beg = strtoull(av[1], NULL, 0); 237 238 if (streaming == 0 || VerboseOpt >= 2) { 239 fprintf(stderr, 240 "Mirror-read: Mirror %016jx to %016jx", 241 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 242 if (histogram) 243 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 244 fprintf(stderr, "\n"); 245 fflush(stderr); 246 } 247 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 248 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 249 (uintmax_t)mirror.key_beg.obj_id); 250 } 251 252 /* 253 * Nothing to do if begin equals end. 254 */ 255 if (mirror.tid_beg >= mirror.tid_end) { 256 if (streaming == 0 || VerboseOpt >= 2) 257 fprintf(stderr, "Mirror-read: No work to do\n"); 258 sleep(DelayOpt); 259 didwork = 0; 260 histogram = 0; 261 goto done; 262 } 263 didwork = 1; 264 265 /* 266 * Write out bulk records 267 */ 268 mirror.ubuf = buf; 269 mirror.size = SERIALBUF_SIZE; 270 271 do { 272 mirror.count = 0; 273 mirror.pfs_id = pfs.pfs_id; 274 mirror.shared_uuid = pfs.ondisk->shared_uuid; 275 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 276 fprintf(stderr, "Mirror-read %s failed: %s\n", 277 filesystem, strerror(errno)); 278 exit(1); 279 } 280 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 281 fprintf(stderr, 282 "Mirror-read %s fatal error %d\n", 283 filesystem, mirror.head.error); 284 exit(1); 285 } 286 if (mirror.count) { 287 if (BandwidthOpt) { 288 n = writebw(1, mirror.ubuf, mirror.count, 289 &bwcount, &bwtv); 290 } else { 291 n = write(1, mirror.ubuf, mirror.count); 292 } 293 if (n != mirror.count) { 294 fprintf(stderr, "Mirror-read %s failed: " 295 "short write\n", 296 filesystem); 297 exit(1); 298 } 299 } 300 total_bytes += mirror.count; 301 if (streaming && VerboseOpt) { 302 fprintf(stderr, 303 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 304 (uintmax_t)mirror.key_cur.obj_id, 305 (uintmax_t)mirror.tid_beg, 306 (uintmax_t)mirror.tid_end, 307 (intmax_t)total_bytes); 308 fflush(stderr); 309 sameline = 0; 310 } 311 mirror.key_beg = mirror.key_cur; 312 313 /* 314 * Deal with time limit option 315 */ 316 if (TimeoutOpt && 317 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 318 fprintf(stderr, 319 "Mirror-read %s interrupted by timer at" 320 " %016jx\n", 321 filesystem, 322 (uintmax_t)mirror.key_cur.obj_id); 323 interrupted = 1; 324 break; 325 } 326 } while (mirror.count != 0); 327 328 done: 329 if (streaming && VerboseOpt && sameline == 0) { 330 fprintf(stderr, "\n"); 331 fflush(stderr); 332 sameline = 1; 333 } 334 335 /* 336 * Write out the termination sync record - only if not interrupted 337 */ 338 if (interrupted == 0) { 339 if (didwork) { 340 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 341 &mrec_tmp, sizeof(mrec_tmp.sync)); 342 } else { 343 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 344 &mrec_tmp, sizeof(mrec_tmp.sync)); 345 } 346 } 347 348 /* 349 * If the -2 option was given (automatic when doing mirror-copy), 350 * a two-way pipe is assumed and we expect a response mrec from 351 * the target. 352 */ 353 if (TwoWayPipeOpt) { 354 mrec = read_mrecord(0, &error, &pickup); 355 if (mrec == NULL || 356 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 357 mrec->head.rec_size != sizeof(mrec->update)) { 358 fprintf(stderr, "mirror_read: Did not get final " 359 "acknowledgement packet from target\n"); 360 exit(1); 361 } 362 if (interrupted) { 363 if (CyclePath) { 364 hammer_set_cycle(&mirror.key_cur, 365 mirror.tid_beg); 366 fprintf(stderr, "Cyclefile %s updated for " 367 "continuation\n", CyclePath); 368 } 369 } else { 370 sync_tid = mrec->update.tid; 371 if (CyclePath) { 372 hammer_key_beg_init(&mirror.key_beg); 373 hammer_set_cycle(&mirror.key_beg, sync_tid); 374 fprintf(stderr, 375 "Cyclefile %s updated to 0x%016jx\n", 376 CyclePath, (uintmax_t)sync_tid); 377 } 378 } 379 } else if (CyclePath) { 380 /* NOTE! mirror.tid_beg cannot be updated */ 381 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 382 "fully updated unless you use mirror-copy\n"); 383 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 384 } 385 if (streaming && interrupted == 0) { 386 time_t t1 = time(NULL); 387 time_t t2; 388 389 /* 390 * Try to break down large bulk transfers into smaller ones 391 * so it can sync the transaction id on the slave. This 392 * way if we get interrupted a restart doesn't have to 393 * start from scratch. 394 */ 395 if (streaming && histogram) { 396 if (histindex != histmax) { 397 if (VerboseOpt && VerboseOpt < 2 && 398 streaming >= 0) { 399 fprintf(stderr, " (bulk incremental)"); 400 } 401 relpfs(fd, &pfs); 402 goto again; 403 } 404 } 405 406 if (VerboseOpt && streaming >= 0) { 407 fprintf(stderr, " W"); 408 fflush(stderr); 409 } 410 pfs.ondisk->sync_end_tid = mirror.tid_end; 411 if (streaming < 0) { 412 /* 413 * Fake streaming mode when using a histogram to 414 * break up a mirror-read, do not wait on source. 415 */ 416 streaming = 0; 417 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 418 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 419 filesystem, strerror(errno)); 420 } else { 421 t2 = time(NULL) - t1; 422 if (t2 >= 0 && t2 < DelayOpt) { 423 if (VerboseOpt) { 424 fprintf(stderr, "\bD"); 425 fflush(stderr); 426 } 427 sleep(DelayOpt - t2); 428 } 429 if (VerboseOpt) { 430 fprintf(stderr, "\b "); 431 fflush(stderr); 432 } 433 relpfs(fd, &pfs); 434 goto again; 435 } 436 } 437 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 438 &mrec_tmp, sizeof(mrec_tmp.sync)); 439 relpfs(fd, &pfs); 440 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 441 } 442 443 /* 444 * What we are trying to do here is figure out how much data is 445 * going to be sent for the TID range and to break the TID range 446 * down into reasonably-sized slices (from the point of view of 447 * data sent) so a lost connection can restart at a reasonable 448 * place and not all the way back at the beginning. 449 * 450 * An entry's TID serves as the end_tid for the prior entry 451 * So we have to offset the calculation by 1 so that TID falls into 452 * the previous entry when populating entries. 453 * 454 * Because the transaction id space is bursty we need a relatively 455 * large number of buckets (like a million) to do a reasonable job 456 * for things like an initial bulk mirrors on a very large filesystem. 457 */ 458 #define HIST_COUNT (1024 * 1024) 459 460 static int 461 generate_histogram(int fd, const char *filesystem, 462 histogram_t *histogram_ary, 463 struct hammer_ioc_mirror_rw *mirror_base, 464 int *repeatp) 465 { 466 struct hammer_ioc_mirror_rw mirror; 467 union hammer_ioc_mrecord_any *mrec; 468 hammer_tid_t tid_beg; 469 hammer_tid_t tid_end; 470 hammer_tid_t tid; 471 hammer_tid_t tidx; 472 u_int64_t *tid_bytes; 473 u_int64_t total; 474 u_int64_t accum; 475 int i; 476 int res; 477 int off; 478 int len; 479 480 mirror = *mirror_base; 481 tid_beg = mirror.tid_beg; 482 tid_end = mirror.tid_end; 483 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 484 485 if (*histogram_ary == NULL) { 486 *histogram_ary = malloc(sizeof(struct histogram) * 487 (HIST_COUNT + 2)); 488 } 489 if (tid_beg >= tid_end) 490 return(0); 491 492 /* needs 2 extra */ 493 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 494 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 495 496 if (*repeatp == 0) { 497 fprintf(stderr, "Prescan to break up bulk transfer"); 498 if (VerboseOpt > 1) 499 fprintf(stderr, " (%juMB chunks)", 500 (uintmax_t)(SplitupOpt / (1024 * 1024))); 501 fprintf(stderr, "\n"); 502 } 503 504 /* 505 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 506 * 507 * Note: Estimates can be off when the mirror is way behind due 508 * to skips. 509 */ 510 total = 0; 511 accum = 0; 512 for (;;) { 513 mirror.count = 0; 514 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 515 fprintf(stderr, "Mirror-read %s failed: %s\n", 516 filesystem, strerror(errno)); 517 exit(1); 518 } 519 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 520 fprintf(stderr, 521 "Mirror-read %s fatal error %d\n", 522 filesystem, mirror.head.error); 523 exit(1); 524 } 525 for (off = 0; 526 off < mirror.count; 527 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size) 528 ) { 529 mrec = (void *)((char *)mirror.ubuf + off); 530 531 /* 532 * We only care about general RECs and PASS 533 * records. We ignore SKIPs. 534 */ 535 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 536 case HAMMER_MREC_TYPE_REC: 537 case HAMMER_MREC_TYPE_PASS: 538 break; 539 default: 540 continue; 541 } 542 543 /* 544 * Calculate for two indices, create_tid and 545 * delete_tid. Record data only applies to 546 * the create_tid. 547 * 548 * When tid is exactly on the boundary it really 549 * belongs to the previous entry because scans 550 * are inclusive of the ending entry. 551 */ 552 tid = mrec->rec.leaf.base.delete_tid; 553 if (tid && tid >= tid_beg && tid <= tid_end) { 554 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 555 if (mrec->head.type == 556 HAMMER_MREC_TYPE_REC) { 557 len -= HAMMER_HEAD_DOALIGN( 558 mrec->rec.leaf.data_len); 559 assert(len > 0); 560 } 561 i = (tid - tid_beg) * HIST_COUNT / 562 (tid_end - tid_beg); 563 tidx = tid_beg + i * (tid_end - tid_beg) / 564 HIST_COUNT; 565 if (tid == tidx && i) 566 --i; 567 assert(i >= 0 && i < HIST_COUNT); 568 tid_bytes[i] += len; 569 total += len; 570 accum += len; 571 } 572 573 tid = mrec->rec.leaf.base.create_tid; 574 if (tid && tid >= tid_beg && tid <= tid_end) { 575 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 576 if (mrec->head.type == 577 HAMMER_MREC_TYPE_REC_NODATA) { 578 len += HAMMER_HEAD_DOALIGN( 579 mrec->rec.leaf.data_len); 580 } 581 i = (tid - tid_beg) * HIST_COUNT / 582 (tid_end - tid_beg); 583 tidx = tid_beg + i * (tid_end - tid_beg) / 584 HIST_COUNT; 585 if (tid == tidx && i) 586 --i; 587 assert(i >= 0 && i < HIST_COUNT); 588 tid_bytes[i] += len; 589 total += len; 590 accum += len; 591 } 592 } 593 if (VerboseOpt > 1) { 594 if (*repeatp == 0 && accum > SplitupOpt) { 595 fprintf(stderr, "."); 596 fflush(stderr); 597 accum = 0; 598 } 599 } 600 if (mirror.count == 0) 601 break; 602 mirror.key_beg = mirror.key_cur; 603 } 604 605 /* 606 * Reduce to SplitupOpt (default 4GB) chunks. This code may 607 * use up to two additional elements. Do the array in-place. 608 * 609 * Inefficient degenerate cases can occur if we do not accumulate 610 * at least the requested split amount, so error on the side of 611 * going over a bit. 612 */ 613 res = 0; 614 (*histogram_ary)[res].tid = tid_beg; 615 (*histogram_ary)[res].bytes = tid_bytes[0]; 616 for (i = 1; i < HIST_COUNT; ++i) { 617 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 618 ++res; 619 (*histogram_ary)[res].tid = tid_beg + 620 i * (tid_end - tid_beg) / 621 HIST_COUNT; 622 (*histogram_ary)[res].bytes = 0; 623 624 } 625 (*histogram_ary)[res].bytes += tid_bytes[i]; 626 } 627 ++res; 628 (*histogram_ary)[res].tid = tid_end; 629 (*histogram_ary)[res].bytes = -1; 630 631 if (*repeatp == 0) { 632 if (VerboseOpt > 1) 633 fprintf(stderr, "\n"); /* newline after ... */ 634 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 635 res, (uintmax_t)total / (1024 * 1024)); 636 for (i = 0; i < res && i < 3; ++i) { 637 if (i) 638 fprintf(stderr, ", "); 639 fprintf(stderr, "%ju", 640 (uintmax_t)(*histogram_ary)[i].bytes); 641 } 642 if (i < res) 643 fprintf(stderr, ", ..."); 644 fprintf(stderr, ")\n"); 645 } 646 assert(res <= HIST_COUNT); 647 *repeatp = 1; 648 649 free(tid_bytes); 650 return(res); 651 } 652 653 static void 654 create_pfs(const char *filesystem, uuid_t *s_uuid) 655 { 656 if (ForceYesOpt == 1) { 657 fprintf(stderr, "PFS slave %s does not exist. " 658 "Auto create new slave PFS!\n", filesystem); 659 660 } else { 661 fprintf(stderr, "PFS slave %s does not exist.\n" 662 "Do you want to create a new slave PFS? (yes|no) ", 663 filesystem); 664 fflush(stderr); 665 if (getyn() != 1) { 666 fprintf(stderr, "Aborting operation\n"); 667 exit(1); 668 } 669 } 670 671 u_int32_t status; 672 char *shared_uuid = NULL; 673 uuid_to_string(s_uuid, &shared_uuid, &status); 674 675 char *cmd = NULL; 676 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 677 filesystem, shared_uuid); 678 free(shared_uuid); 679 680 if (cmd == NULL) { 681 fprintf(stderr, "Failed to alloc memory\n"); 682 exit(1); 683 } 684 if (system(cmd) != 0) { 685 fprintf(stderr, "Failed to create PFS\n"); 686 } 687 free(cmd); 688 } 689 690 /* 691 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 692 * some additional packet types to negotiate TID ranges and to verify 693 * completion. The HAMMER VFS does most of the work. 694 * 695 * It is important to note that the mirror.key_{beg,end} range must 696 * match the ranged used by the original. For now both sides use 697 * range the entire key space. 698 * 699 * It is even more important that the records in the stream conform 700 * to the TID range also supplied in the stream. The HAMMER VFS will 701 * use the REC, PASS, and SKIP record types to track the portions of 702 * the B-Tree being scanned in order to be able to proactively delete 703 * records on the target within those active areas that are not mentioned 704 * by the source. 705 * 706 * The mirror.key_cur field is used by the VFS to do this tracking. It 707 * must be initialized to key_beg but then is persistently updated by 708 * the HAMMER VFS on each successive ioctl() call. If you blow up this 709 * field you will blow up the mirror target, possibly to the point of 710 * deleting everything. As a safety measure the HAMMER VFS simply marks 711 * the records that the source has destroyed as deleted on the target, 712 * and normal pruning operations will deal with their final disposition 713 * at some later time. 714 */ 715 void 716 hammer_cmd_mirror_write(char **av, int ac) 717 { 718 struct hammer_ioc_mirror_rw mirror; 719 const char *filesystem; 720 char *buf = malloc(SERIALBUF_SIZE); 721 struct hammer_ioc_pseudofs_rw pfs; 722 struct hammer_ioc_mrecord_head pickup; 723 struct hammer_ioc_synctid synctid; 724 union hammer_ioc_mrecord_any mrec_tmp; 725 hammer_ioc_mrecord_any_t mrec; 726 struct stat st; 727 int error; 728 int fd; 729 int n; 730 731 if (ac != 1) 732 mirror_usage(1); 733 filesystem = av[0]; 734 735 pickup.signature = 0; 736 pickup.type = 0; 737 738 again: 739 bzero(&mirror, sizeof(mirror)); 740 hammer_key_beg_init(&mirror.key_beg); 741 hammer_key_end_init(&mirror.key_end); 742 mirror.key_end = mirror.key_beg; 743 744 /* 745 * Read initial packet 746 */ 747 mrec = read_mrecord(0, &error, &pickup); 748 if (mrec == NULL) { 749 if (error == 0) 750 fprintf(stderr, "validate_mrec_header: short read\n"); 751 exit(1); 752 } 753 /* 754 * Validate packet 755 */ 756 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 757 return; 758 } 759 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 760 fprintf(stderr, "validate_mrec_header: did not get expected " 761 "PFSD record type\n"); 762 exit(1); 763 } 764 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 765 fprintf(stderr, "validate_mrec_header: unexpected payload " 766 "size\n"); 767 exit(1); 768 } 769 770 /* 771 * Create slave PFS if it doesn't yet exist 772 */ 773 if (lstat(filesystem, &st) != 0) { 774 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 775 } 776 free(mrec); 777 mrec = NULL; 778 779 fd = getpfs(&pfs, filesystem); 780 781 /* 782 * In two-way mode the target writes out a PFS packet first. 783 * The source uses our tid_end as its tid_beg by default, 784 * picking up where it left off. 785 */ 786 mirror.tid_beg = 0; 787 if (TwoWayPipeOpt) { 788 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 789 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 790 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 791 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 792 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 793 &mrec_tmp, sizeof(mrec_tmp.pfs)); 794 } 795 796 /* 797 * Read and process the PFS header. The source informs us of 798 * the TID range the stream represents. 799 */ 800 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 801 &mirror.tid_beg, &mirror.tid_end); 802 if (n < 0) { /* got TERM record */ 803 relpfs(fd, &pfs); 804 return; 805 } 806 807 mirror.ubuf = buf; 808 mirror.size = SERIALBUF_SIZE; 809 810 /* 811 * Read and process bulk records (REC, PASS, and SKIP types). 812 * 813 * On your life, do NOT mess with mirror.key_cur or your mirror 814 * target may become history. 815 */ 816 for (;;) { 817 mirror.count = 0; 818 mirror.pfs_id = pfs.pfs_id; 819 mirror.shared_uuid = pfs.ondisk->shared_uuid; 820 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 821 if (mirror.size <= 0) 822 break; 823 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 824 fprintf(stderr, "Mirror-write %s failed: %s\n", 825 filesystem, strerror(errno)); 826 exit(1); 827 } 828 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 829 fprintf(stderr, 830 "Mirror-write %s fatal error %d\n", 831 filesystem, mirror.head.error); 832 exit(1); 833 } 834 #if 0 835 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 836 fprintf(stderr, 837 "Mirror-write %s interrupted by timer at" 838 " %016llx\n", 839 filesystem, 840 mirror.key_cur.obj_id); 841 exit(0); 842 } 843 #endif 844 } 845 846 /* 847 * Read and process the termination sync record. 848 */ 849 mrec = read_mrecord(0, &error, &pickup); 850 851 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 852 fprintf(stderr, "Mirror-write: received termination request\n"); 853 free(mrec); 854 return; 855 } 856 857 if (mrec == NULL || 858 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 859 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 860 mrec->head.rec_size != sizeof(mrec->sync)) { 861 fprintf(stderr, "Mirror-write %s: Did not get termination " 862 "sync record, or rec_size is wrong rt=%d\n", 863 filesystem, mrec->head.type); 864 exit(1); 865 } 866 867 /* 868 * Update the PFS info on the target so the user has visibility 869 * into the new snapshot, and sync the target filesystem. 870 */ 871 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 872 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 873 874 bzero(&synctid, sizeof(synctid)); 875 synctid.op = HAMMER_SYNCTID_SYNC2; 876 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 877 878 if (VerboseOpt >= 2) { 879 fprintf(stderr, "Mirror-write %s: succeeded\n", 880 filesystem); 881 } 882 } 883 884 free(mrec); 885 mrec = NULL; 886 887 /* 888 * Report back to the originator. 889 */ 890 if (TwoWayPipeOpt) { 891 mrec_tmp.update.tid = mirror.tid_end; 892 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 893 &mrec_tmp, sizeof(mrec_tmp.update)); 894 } else { 895 printf("Source can update synctid to 0x%016jx\n", 896 (uintmax_t)mirror.tid_end); 897 } 898 relpfs(fd, &pfs); 899 goto again; 900 } 901 902 void 903 hammer_cmd_mirror_dump(void) 904 { 905 char *buf = malloc(SERIALBUF_SIZE); 906 struct hammer_ioc_mrecord_head pickup; 907 hammer_ioc_mrecord_any_t mrec; 908 int error; 909 int size; 910 int offset; 911 int bytes; 912 913 /* 914 * Read and process the PFS header 915 */ 916 pickup.signature = 0; 917 pickup.type = 0; 918 919 mrec = read_mrecord(0, &error, &pickup); 920 921 again: 922 /* 923 * Read and process bulk records 924 */ 925 for (;;) { 926 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 927 if (size <= 0) 928 break; 929 offset = 0; 930 while (offset < size) { 931 mrec = (void *)((char *)buf + offset); 932 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 933 if (offset + bytes > size) { 934 fprintf(stderr, "Misaligned record\n"); 935 exit(1); 936 } 937 938 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 939 case HAMMER_MREC_TYPE_REC_BADCRC: 940 case HAMMER_MREC_TYPE_REC: 941 printf("Record obj=%016jx key=%016jx " 942 "rt=%02x ot=%02x", 943 (uintmax_t)mrec->rec.leaf.base.obj_id, 944 (uintmax_t)mrec->rec.leaf.base.key, 945 mrec->rec.leaf.base.rec_type, 946 mrec->rec.leaf.base.obj_type); 947 if (mrec->head.type == 948 HAMMER_MREC_TYPE_REC_BADCRC) { 949 printf(" (BAD CRC)"); 950 } 951 printf("\n"); 952 printf(" tids %016jx:%016jx data=%d\n", 953 (uintmax_t)mrec->rec.leaf.base.create_tid, 954 (uintmax_t)mrec->rec.leaf.base.delete_tid, 955 mrec->rec.leaf.data_len); 956 break; 957 case HAMMER_MREC_TYPE_PASS: 958 printf("Pass obj=%016jx key=%016jx " 959 "rt=%02x ot=%02x\n", 960 (uintmax_t)mrec->rec.leaf.base.obj_id, 961 (uintmax_t)mrec->rec.leaf.base.key, 962 mrec->rec.leaf.base.rec_type, 963 mrec->rec.leaf.base.obj_type); 964 printf(" tids %016jx:%016jx data=%d\n", 965 (uintmax_t)mrec->rec.leaf.base.create_tid, 966 (uintmax_t)mrec->rec.leaf.base.delete_tid, 967 mrec->rec.leaf.data_len); 968 break; 969 case HAMMER_MREC_TYPE_SKIP: 970 printf("Skip obj=%016jx key=%016jx rt=%02x to\n" 971 " obj=%016jx key=%016jx rt=%02x\n", 972 (uintmax_t)mrec->skip.skip_beg.obj_id, 973 (uintmax_t)mrec->skip.skip_beg.key, 974 mrec->skip.skip_beg.rec_type, 975 (uintmax_t)mrec->skip.skip_end.obj_id, 976 (uintmax_t)mrec->skip.skip_end.key, 977 mrec->skip.skip_end.rec_type); 978 default: 979 break; 980 } 981 offset += bytes; 982 } 983 } 984 985 /* 986 * Read and process the termination sync record. 987 */ 988 mrec = read_mrecord(0, &error, &pickup); 989 if (mrec == NULL || 990 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 991 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 992 ) { 993 fprintf(stderr, "Mirror-dump: Did not get termination " 994 "sync record\n"); 995 } 996 997 /* 998 * Continue with more batches until EOF. 999 */ 1000 mrec = read_mrecord(0, &error, &pickup); 1001 if (mrec) 1002 goto again; 1003 } 1004 1005 void 1006 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1007 { 1008 pid_t pid1; 1009 pid_t pid2; 1010 int fds[2]; 1011 const char *xav[32]; 1012 char tbuf[16]; 1013 char *ptr; 1014 int xac; 1015 1016 if (ac != 2) 1017 mirror_usage(1); 1018 1019 TwoWayPipeOpt = 1; 1020 signal(SIGPIPE, SIG_IGN); 1021 1022 again: 1023 if (pipe(fds) < 0) { 1024 perror("pipe"); 1025 exit(1); 1026 } 1027 1028 /* 1029 * Source 1030 */ 1031 if ((pid1 = fork()) == 0) { 1032 signal(SIGPIPE, SIG_DFL); 1033 dup2(fds[0], 0); 1034 dup2(fds[0], 1); 1035 close(fds[0]); 1036 close(fds[1]); 1037 if ((ptr = strchr(av[0], ':')) != NULL) { 1038 *ptr++ = 0; 1039 xac = 0; 1040 xav[xac++] = "ssh"; 1041 if (CompressOpt) 1042 xav[xac++] = "-C"; 1043 if (SshPort) { 1044 xav[xac++] = "-p"; 1045 xav[xac++] = SshPort; 1046 } 1047 xav[xac++] = av[0]; 1048 xav[xac++] = "hammer"; 1049 1050 switch(VerboseOpt) { 1051 case 0: 1052 break; 1053 case 1: 1054 xav[xac++] = "-v"; 1055 break; 1056 case 2: 1057 xav[xac++] = "-vv"; 1058 break; 1059 default: 1060 xav[xac++] = "-vvv"; 1061 break; 1062 } 1063 if (ForceYesOpt) { 1064 xav[xac++] = "-y"; 1065 } 1066 xav[xac++] = "-2"; 1067 if (TimeoutOpt) { 1068 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1069 xav[xac++] = "-t"; 1070 xav[xac++] = tbuf; 1071 } 1072 if (SplitupOptStr) { 1073 xav[xac++] = "-S"; 1074 xav[xac++] = SplitupOptStr; 1075 } 1076 if (streaming) 1077 xav[xac++] = "mirror-read-stream"; 1078 else 1079 xav[xac++] = "mirror-read"; 1080 xav[xac++] = ptr; 1081 xav[xac++] = NULL; 1082 execv("/usr/bin/ssh", (void *)xav); 1083 } else { 1084 hammer_cmd_mirror_read(av, 1, streaming); 1085 fflush(stdout); 1086 fflush(stderr); 1087 } 1088 _exit(1); 1089 } 1090 1091 /* 1092 * Target 1093 */ 1094 if ((pid2 = fork()) == 0) { 1095 signal(SIGPIPE, SIG_DFL); 1096 dup2(fds[1], 0); 1097 dup2(fds[1], 1); 1098 close(fds[0]); 1099 close(fds[1]); 1100 if ((ptr = strchr(av[1], ':')) != NULL) { 1101 *ptr++ = 0; 1102 xac = 0; 1103 xav[xac++] = "ssh"; 1104 if (CompressOpt) 1105 xav[xac++] = "-C"; 1106 if (SshPort) { 1107 xav[xac++] = "-p"; 1108 xav[xac++] = SshPort; 1109 } 1110 xav[xac++] = av[1]; 1111 xav[xac++] = "hammer"; 1112 1113 switch(VerboseOpt) { 1114 case 0: 1115 break; 1116 case 1: 1117 xav[xac++] = "-v"; 1118 break; 1119 case 2: 1120 xav[xac++] = "-vv"; 1121 break; 1122 default: 1123 xav[xac++] = "-vvv"; 1124 break; 1125 } 1126 if (ForceYesOpt) { 1127 xav[xac++] = "-y"; 1128 } 1129 xav[xac++] = "-2"; 1130 xav[xac++] = "mirror-write"; 1131 xav[xac++] = ptr; 1132 xav[xac++] = NULL; 1133 execv("/usr/bin/ssh", (void *)xav); 1134 } else { 1135 hammer_cmd_mirror_write(av + 1, 1); 1136 fflush(stdout); 1137 fflush(stderr); 1138 } 1139 _exit(1); 1140 } 1141 close(fds[0]); 1142 close(fds[1]); 1143 1144 while (waitpid(pid1, NULL, 0) <= 0) 1145 ; 1146 while (waitpid(pid2, NULL, 0) <= 0) 1147 ; 1148 1149 /* 1150 * If the link is lost restart 1151 */ 1152 if (streaming) { 1153 if (VerboseOpt) { 1154 fprintf(stderr, "\nLost Link\n"); 1155 fflush(stderr); 1156 } 1157 sleep(15 + DelayOpt); 1158 goto again; 1159 } 1160 1161 } 1162 1163 /* 1164 * Read and return multiple mrecords 1165 */ 1166 static int 1167 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1168 { 1169 hammer_ioc_mrecord_any_t mrec; 1170 u_int count; 1171 size_t n; 1172 size_t i; 1173 size_t bytes; 1174 int type; 1175 1176 count = 0; 1177 while (size - count >= HAMMER_MREC_HEADSIZE) { 1178 /* 1179 * Cached the record header in case we run out of buffer 1180 * space. 1181 */ 1182 fflush(stdout); 1183 if (pickup->signature == 0) { 1184 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1185 i = read(fd, (char *)pickup + n, 1186 HAMMER_MREC_HEADSIZE - n); 1187 if (i <= 0) 1188 break; 1189 } 1190 if (n == 0) 1191 break; 1192 if (n != HAMMER_MREC_HEADSIZE) { 1193 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1194 exit(1); 1195 } 1196 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1197 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1198 "bad signature\n"); 1199 exit(1); 1200 } 1201 } 1202 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1203 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1204 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1205 "illegal rec_size\n"); 1206 exit(1); 1207 } 1208 1209 /* 1210 * Stop if we have insufficient space for the record and data. 1211 */ 1212 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1213 if (size - count < bytes) 1214 break; 1215 1216 /* 1217 * Stop if the record type is not a REC, SKIP, or PASS, 1218 * which are the only types the ioctl supports. Other types 1219 * are used only by the userland protocol. 1220 * 1221 * Ignore all flags. 1222 */ 1223 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1224 if (type != HAMMER_MREC_TYPE_PFSD && 1225 type != HAMMER_MREC_TYPE_REC && 1226 type != HAMMER_MREC_TYPE_SKIP && 1227 type != HAMMER_MREC_TYPE_PASS) { 1228 break; 1229 } 1230 1231 /* 1232 * Read the remainder and clear the pickup signature. 1233 */ 1234 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1235 i = read(fd, buf + count + n, bytes - n); 1236 if (i <= 0) 1237 break; 1238 } 1239 if (n != bytes) { 1240 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1241 exit(1); 1242 } 1243 1244 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1245 pickup->signature = 0; 1246 pickup->type = 0; 1247 mrec = (void *)(buf + count); 1248 1249 /* 1250 * Validate the completed record 1251 */ 1252 if (mrec->head.rec_crc != 1253 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1254 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1255 fprintf(stderr, "read_mrecords: malformed record " 1256 "on pipe, bad crc\n"); 1257 exit(1); 1258 } 1259 1260 /* 1261 * If its a B-Tree record validate the data crc. 1262 * 1263 * NOTE: If the VFS passes us an explicitly errorde mrec 1264 * we just pass it through. 1265 */ 1266 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1267 1268 if (type == HAMMER_MREC_TYPE_REC) { 1269 if (mrec->head.rec_size < 1270 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1271 fprintf(stderr, 1272 "read_mrecords: malformed record on " 1273 "pipe, illegal element data_len\n"); 1274 exit(1); 1275 } 1276 if (mrec->rec.leaf.data_len && 1277 mrec->rec.leaf.data_offset && 1278 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1279 fprintf(stderr, 1280 "read_mrecords: data_crc did not " 1281 "match data! obj=%016jx key=%016jx\n", 1282 (uintmax_t)mrec->rec.leaf.base.obj_id, 1283 (uintmax_t)mrec->rec.leaf.base.key); 1284 fprintf(stderr, 1285 "continuing, but there are problems\n"); 1286 } 1287 } 1288 count += bytes; 1289 } 1290 return(count); 1291 } 1292 1293 /* 1294 * Read and return a single mrecord. 1295 */ 1296 static 1297 hammer_ioc_mrecord_any_t 1298 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1299 { 1300 hammer_ioc_mrecord_any_t mrec; 1301 struct hammer_ioc_mrecord_head mrechd; 1302 size_t bytes; 1303 size_t n; 1304 size_t i; 1305 1306 if (pickup && pickup->type != 0) { 1307 mrechd = *pickup; 1308 pickup->signature = 0; 1309 pickup->type = 0; 1310 n = HAMMER_MREC_HEADSIZE; 1311 } else { 1312 /* 1313 * Read in the PFSD header from the sender. 1314 */ 1315 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1316 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1317 if (i <= 0) 1318 break; 1319 } 1320 if (n == 0) { 1321 *errorp = 0; /* EOF */ 1322 return(NULL); 1323 } 1324 if (n != HAMMER_MREC_HEADSIZE) { 1325 fprintf(stderr, "short read of mrecord header\n"); 1326 *errorp = EPIPE; 1327 return(NULL); 1328 } 1329 } 1330 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1331 fprintf(stderr, "read_mrecord: bad signature\n"); 1332 *errorp = EINVAL; 1333 return(NULL); 1334 } 1335 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1336 assert(bytes >= sizeof(mrechd)); 1337 mrec = malloc(bytes); 1338 mrec->head = mrechd; 1339 1340 while (n < bytes) { 1341 i = read(fdin, (char *)mrec + n, bytes - n); 1342 if (i <= 0) 1343 break; 1344 n += i; 1345 } 1346 if (n != bytes) { 1347 fprintf(stderr, "read_mrecord: short read on payload\n"); 1348 *errorp = EPIPE; 1349 return(NULL); 1350 } 1351 if (mrec->head.rec_crc != 1352 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1353 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1354 fprintf(stderr, "read_mrecord: bad CRC\n"); 1355 *errorp = EINVAL; 1356 return(NULL); 1357 } 1358 *errorp = 0; 1359 return(mrec); 1360 } 1361 1362 static 1363 void 1364 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1365 int bytes) 1366 { 1367 char zbuf[HAMMER_HEAD_ALIGN]; 1368 int pad; 1369 1370 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1371 1372 assert(bytes >= (int)sizeof(mrec->head)); 1373 bzero(&mrec->head, sizeof(mrec->head)); 1374 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1375 mrec->head.type = type; 1376 mrec->head.rec_size = bytes; 1377 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1378 bytes - HAMMER_MREC_CRCOFF); 1379 if (write(fdout, mrec, bytes) != bytes) { 1380 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1381 errno, strerror(errno)); 1382 exit(1); 1383 } 1384 if (pad) { 1385 bzero(zbuf, pad); 1386 if (write(fdout, zbuf, pad) != pad) { 1387 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1388 errno, strerror(errno)); 1389 exit(1); 1390 } 1391 } 1392 } 1393 1394 /* 1395 * Generate a mirroring header with the pfs information of the 1396 * originating filesytem. 1397 */ 1398 static void 1399 generate_mrec_header(int fd, int pfs_id, 1400 union hammer_ioc_mrecord_any *mrec_tmp) 1401 { 1402 struct hammer_ioc_pseudofs_rw pfs; 1403 1404 bzero(&pfs, sizeof(pfs)); 1405 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1406 pfs.pfs_id = pfs_id; 1407 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1408 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1409 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1410 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1411 exit(1); 1412 } 1413 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1414 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1415 exit(1); 1416 } 1417 mrec_tmp->pfs.version = pfs.version; 1418 } 1419 1420 /* 1421 * Validate the pfs information from the originating filesystem 1422 * against the target filesystem. shared_uuid must match. 1423 * 1424 * return -1 if we got a TERM record 1425 */ 1426 static int 1427 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1428 struct hammer_ioc_mrecord_head *pickup, 1429 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1430 { 1431 struct hammer_ioc_pseudofs_rw pfs; 1432 struct hammer_pseudofs_data pfsd; 1433 hammer_ioc_mrecord_any_t mrec; 1434 int error; 1435 1436 /* 1437 * Get the PFSD info from the target filesystem. 1438 */ 1439 bzero(&pfs, sizeof(pfs)); 1440 bzero(&pfsd, sizeof(pfsd)); 1441 pfs.pfs_id = pfs_id; 1442 pfs.ondisk = &pfsd; 1443 pfs.bytes = sizeof(pfsd); 1444 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1445 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1446 exit(1); 1447 } 1448 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1449 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1450 exit(1); 1451 } 1452 1453 mrec = read_mrecord(fdin, &error, pickup); 1454 if (mrec == NULL) { 1455 if (error == 0) 1456 fprintf(stderr, "validate_mrec_header: short read\n"); 1457 exit(1); 1458 } 1459 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1460 free(mrec); 1461 return(-1); 1462 } 1463 1464 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1465 fprintf(stderr, "validate_mrec_header: did not get expected " 1466 "PFSD record type\n"); 1467 exit(1); 1468 } 1469 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1470 fprintf(stderr, "validate_mrec_header: unexpected payload " 1471 "size\n"); 1472 exit(1); 1473 } 1474 if (mrec->pfs.version != pfs.version) { 1475 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1476 exit(1); 1477 } 1478 1479 /* 1480 * Whew. Ok, is the read PFS info compatible with the target? 1481 */ 1482 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1483 sizeof(pfsd.shared_uuid)) != 0) { 1484 fprintf(stderr, 1485 "mirror-write: source and target have " 1486 "different shared-uuid's!\n"); 1487 exit(1); 1488 } 1489 if (is_target && 1490 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1491 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1492 exit(1); 1493 } 1494 if (tid_begp) 1495 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1496 if (tid_endp) 1497 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1498 free(mrec); 1499 return(0); 1500 } 1501 1502 static void 1503 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1504 { 1505 struct hammer_ioc_pseudofs_rw pfs; 1506 struct hammer_pseudofs_data pfsd; 1507 1508 bzero(&pfs, sizeof(pfs)); 1509 bzero(&pfsd, sizeof(pfsd)); 1510 pfs.pfs_id = pfs_id; 1511 pfs.ondisk = &pfsd; 1512 pfs.bytes = sizeof(pfsd); 1513 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1514 perror("update_pfs_snapshot (read)"); 1515 exit(1); 1516 } 1517 if (pfsd.sync_end_tid != snapshot_tid) { 1518 pfsd.sync_end_tid = snapshot_tid; 1519 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1520 perror("update_pfs_snapshot (rewrite)"); 1521 exit(1); 1522 } 1523 if (VerboseOpt >= 2) { 1524 fprintf(stderr, 1525 "Mirror-write: Completed, updated snapshot " 1526 "to %016jx\n", 1527 (uintmax_t)snapshot_tid); 1528 fflush(stderr); 1529 } 1530 } 1531 } 1532 1533 /* 1534 * Bandwidth-limited write in chunks 1535 */ 1536 static 1537 ssize_t 1538 writebw(int fd, const void *buf, size_t nbytes, 1539 u_int64_t *bwcount, struct timeval *tv1) 1540 { 1541 struct timeval tv2; 1542 size_t n; 1543 ssize_t r; 1544 ssize_t a; 1545 int usec; 1546 1547 a = 0; 1548 r = 0; 1549 while (nbytes) { 1550 if (*bwcount + nbytes > BandwidthOpt) 1551 n = BandwidthOpt - *bwcount; 1552 else 1553 n = nbytes; 1554 if (n) 1555 r = write(fd, buf, n); 1556 if (r >= 0) { 1557 a += r; 1558 nbytes -= r; 1559 buf = (const char *)buf + r; 1560 } 1561 if ((size_t)r != n) 1562 break; 1563 *bwcount += n; 1564 if (*bwcount >= BandwidthOpt) { 1565 gettimeofday(&tv2, NULL); 1566 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1567 (int)(tv2.tv_usec - tv1->tv_usec); 1568 if (usec >= 0 && usec < 1000000) 1569 usleep(1000000 - usec); 1570 gettimeofday(tv1, NULL); 1571 *bwcount -= BandwidthOpt; 1572 } 1573 } 1574 return(a ? a : r); 1575 } 1576 1577 /* 1578 * Get a yes or no answer from the terminal. The program may be run as 1579 * part of a two-way pipe so we cannot use stdin for this operation. 1580 */ 1581 static int 1582 getyn(void) 1583 { 1584 char buf[256]; 1585 FILE *fp; 1586 int result; 1587 1588 fp = fopen("/dev/tty", "r"); 1589 if (fp == NULL) { 1590 fprintf(stderr, "No terminal for response\n"); 1591 return(-1); 1592 } 1593 result = -1; 1594 while (fgets(buf, sizeof(buf), fp) != NULL) { 1595 if (buf[0] == 'y' || buf[0] == 'Y') { 1596 result = 1; 1597 break; 1598 } 1599 if (buf[0] == 'n' || buf[0] == 'N') { 1600 result = 0; 1601 break; 1602 } 1603 fprintf(stderr, "Response not understood\n"); 1604 break; 1605 } 1606 fclose(fp); 1607 return(result); 1608 } 1609 1610 static void 1611 mirror_usage(int code) 1612 { 1613 fprintf(stderr, 1614 "hammer mirror-read <filesystem> [begin-tid]\n" 1615 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1616 "hammer mirror-write <filesystem>\n" 1617 "hammer mirror-dump\n" 1618 "hammer mirror-copy [[user@]host:]<filesystem>" 1619 " [[user@]host:]<filesystem>\n" 1620 "hammer mirror-stream [[user@]host:]<filesystem>" 1621 " [[user@]host:]<filesystem>\n" 1622 ); 1623 exit(code); 1624 } 1625 1626