1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static int getyn(void); 56 static void mirror_usage(int code); 57 58 /* 59 * Generate a mirroring data stream from the specific source over the 60 * entire key range, but restricted to the specified transaction range. 61 * 62 * The HAMMER VFS does most of the work, we add a few new mrecord 63 * types to negotiate the TID ranges and verify that the entire 64 * stream made it to the destination. 65 */ 66 void 67 hammer_cmd_mirror_read(char **av, int ac, int streaming) 68 { 69 struct hammer_ioc_mirror_rw mirror; 70 struct hammer_ioc_pseudofs_rw pfs; 71 union hammer_ioc_mrecord_any mrec_tmp; 72 struct hammer_ioc_mrecord_head pickup; 73 hammer_ioc_mrecord_any_t mrec; 74 hammer_tid_t sync_tid; 75 const char *filesystem; 76 char *buf = malloc(SERIALBUF_SIZE); 77 int interrupted = 0; 78 int error; 79 int fd; 80 int n; 81 int didwork; 82 int64_t total_bytes; 83 time_t base_t = time(NULL); 84 struct timeval bwtv; 85 u_int64_t bwcount; 86 87 if (ac == 0 || ac > 2) 88 mirror_usage(1); 89 filesystem = av[0]; 90 91 pickup.signature = 0; 92 pickup.type = 0; 93 94 again: 95 bzero(&mirror, sizeof(mirror)); 96 hammer_key_beg_init(&mirror.key_beg); 97 hammer_key_end_init(&mirror.key_end); 98 99 fd = getpfs(&pfs, filesystem); 100 101 if (streaming && VerboseOpt) { 102 fprintf(stderr, "\nRunning"); 103 fflush(stderr); 104 } 105 total_bytes = 0; 106 gettimeofday(&bwtv, NULL); 107 bwcount = 0; 108 109 /* 110 * Send initial header for the purpose of determining shared-uuid. 111 */ 112 generate_mrec_header(fd, 1, pfs.pfs_id, NULL, NULL); 113 114 /* 115 * In 2-way mode the target will send us a PFS info packet 116 * first. Use the target's current snapshot TID as our default 117 * begin TID. 118 */ 119 mirror.tid_beg = 0; 120 if (TwoWayPipeOpt) { 121 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 122 NULL, &mirror.tid_beg); 123 if (n < 0) { /* got TERM record */ 124 relpfs(fd, &pfs); 125 return; 126 } 127 ++mirror.tid_beg; 128 } 129 130 /* 131 * Write out the PFS header, tid_beg will be updated if our PFS 132 * has a larger begin sync. tid_end is set to the latest source 133 * TID whos flush cycle has completed. 134 */ 135 generate_mrec_header(fd, 1, pfs.pfs_id, 136 &mirror.tid_beg, &mirror.tid_end); 137 138 /* XXX streaming mode support w/ cycle or command line arg */ 139 /* 140 * A cycle file overrides the beginning TID 141 */ 142 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 143 144 if (ac == 2) 145 mirror.tid_beg = strtoull(av[1], NULL, 0); 146 147 if (streaming == 0 || VerboseOpt >= 2) { 148 fprintf(stderr, 149 "Mirror-read: Mirror from %016llx to %016llx\n", 150 mirror.tid_beg, mirror.tid_end); 151 } 152 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 153 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 154 mirror.key_beg.obj_id); 155 } 156 157 /* 158 * Nothing to do if begin equals end. 159 */ 160 if (mirror.tid_beg >= mirror.tid_end) { 161 if (streaming == 0 || VerboseOpt >= 2) 162 fprintf(stderr, "Mirror-read: No work to do\n"); 163 didwork = 0; 164 goto done; 165 } 166 didwork = 1; 167 168 /* 169 * Write out bulk records 170 */ 171 mirror.ubuf = buf; 172 mirror.size = SERIALBUF_SIZE; 173 174 do { 175 mirror.count = 0; 176 mirror.pfs_id = pfs.pfs_id; 177 mirror.shared_uuid = pfs.ondisk->shared_uuid; 178 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 179 fprintf(stderr, "Mirror-read %s failed: %s\n", 180 filesystem, strerror(errno)); 181 exit(1); 182 } 183 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 184 fprintf(stderr, 185 "Mirror-read %s fatal error %d\n", 186 filesystem, mirror.head.error); 187 exit(1); 188 } 189 if (mirror.count) { 190 if (BandwidthOpt) { 191 n = writebw(1, mirror.ubuf, mirror.count, 192 &bwcount, &bwtv); 193 } else { 194 n = write(1, mirror.ubuf, mirror.count); 195 } 196 if (n != mirror.count) { 197 fprintf(stderr, "Mirror-read %s failed: " 198 "short write\n", 199 filesystem); 200 exit(1); 201 } 202 } 203 total_bytes += mirror.count; 204 if (streaming && VerboseOpt) { 205 fprintf(stderr, "\r%016llx %11lld", 206 mirror.key_cur.obj_id, 207 total_bytes); 208 fflush(stderr); 209 } 210 mirror.key_beg = mirror.key_cur; 211 if (TimeoutOpt && 212 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 213 fprintf(stderr, 214 "Mirror-read %s interrupted by timer at" 215 " %016llx\n", 216 filesystem, 217 mirror.key_cur.obj_id); 218 interrupted = 1; 219 break; 220 } 221 } while (mirror.count != 0); 222 223 done: 224 /* 225 * Write out the termination sync record - only if not interrupted 226 */ 227 if (interrupted == 0) { 228 if (didwork) { 229 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 230 &mrec_tmp, sizeof(mrec_tmp.sync)); 231 } else { 232 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 233 &mrec_tmp, sizeof(mrec_tmp.sync)); 234 } 235 } 236 237 /* 238 * If the -2 option was given (automatic when doing mirror-copy), 239 * a two-way pipe is assumed and we expect a response mrec from 240 * the target. 241 */ 242 if (TwoWayPipeOpt) { 243 mrec = read_mrecord(0, &error, &pickup); 244 if (mrec == NULL || 245 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 246 mrec->head.rec_size != sizeof(mrec->update)) { 247 fprintf(stderr, "mirror_read: Did not get final " 248 "acknowledgement packet from target\n"); 249 exit(1); 250 } 251 if (interrupted) { 252 if (CyclePath) { 253 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 254 fprintf(stderr, "Cyclefile %s updated for " 255 "continuation\n", CyclePath); 256 } 257 } else { 258 sync_tid = mrec->update.tid; 259 if (CyclePath) { 260 hammer_key_beg_init(&mirror.key_beg); 261 hammer_set_cycle(&mirror.key_beg, sync_tid); 262 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 263 CyclePath, sync_tid); 264 } 265 } 266 } else if (CyclePath) { 267 /* NOTE! mirror.tid_beg cannot be updated */ 268 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 269 "fully updated unless you use mirror-copy\n"); 270 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 271 } 272 if (streaming && interrupted == 0) { 273 time_t t1 = time(NULL); 274 time_t t2; 275 276 if (VerboseOpt) { 277 fprintf(stderr, " W"); 278 fflush(stderr); 279 } 280 pfs.ondisk->sync_end_tid = mirror.tid_end; 281 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 282 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 283 filesystem, strerror(errno)); 284 } else { 285 t2 = time(NULL) - t1; 286 if (t2 >= 0 && t2 < DelayOpt) { 287 if (VerboseOpt) { 288 fprintf(stderr, "\bD"); 289 fflush(stderr); 290 } 291 sleep(DelayOpt - t2); 292 } 293 if (VerboseOpt) { 294 fprintf(stderr, "\b "); 295 fflush(stderr); 296 } 297 relpfs(fd, &pfs); 298 goto again; 299 } 300 } 301 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 302 &mrec_tmp, sizeof(mrec_tmp.sync)); 303 relpfs(fd, &pfs); 304 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 305 } 306 307 static void 308 create_pfs(const char *filesystem, uuid_t *s_uuid) 309 { 310 if (ForceYesOpt == 1) { 311 fprintf(stderr, "PFS slave %s does not exist. " 312 "Auto create new slave PFS!\n", filesystem); 313 314 } else { 315 fprintf(stderr, "PFS slave %s does not exist.\n" 316 "Do you want to create a new slave PFS? (yes|no) ", 317 filesystem); 318 fflush(stderr); 319 if (getyn() != 1) { 320 fprintf(stderr, "Aborting operation\n"); 321 exit(1); 322 } 323 } 324 325 u_int32_t status; 326 char *shared_uuid = NULL; 327 uuid_to_string(s_uuid, &shared_uuid, &status); 328 329 char *cmd = NULL; 330 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 331 filesystem, shared_uuid); 332 free(shared_uuid); 333 334 if (cmd == NULL) { 335 fprintf(stderr, "Failed to alloc memory\n"); 336 exit(1); 337 } 338 if (system(cmd) != 0) { 339 fprintf(stderr, "Failed to create PFS\n"); 340 } 341 free(cmd); 342 } 343 344 /* 345 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 346 * some additional packet types to negotiate TID ranges and to verify 347 * completion. The HAMMER VFS does most of the work. 348 * 349 * It is important to note that the mirror.key_{beg,end} range must 350 * match the ranged used by the original. For now both sides use 351 * range the entire key space. 352 * 353 * It is even more important that the records in the stream conform 354 * to the TID range also supplied in the stream. The HAMMER VFS will 355 * use the REC, PASS, and SKIP record types to track the portions of 356 * the B-Tree being scanned in order to be able to proactively delete 357 * records on the target within those active areas that are not mentioned 358 * by the source. 359 * 360 * The mirror.key_cur field is used by the VFS to do this tracking. It 361 * must be initialized to key_beg but then is persistently updated by 362 * the HAMMER VFS on each successive ioctl() call. If you blow up this 363 * field you will blow up the mirror target, possibly to the point of 364 * deleting everything. As a safety measure the HAMMER VFS simply marks 365 * the records that the source has destroyed as deleted on the target, 366 * and normal pruning operations will deal with their final disposition 367 * at some later time. 368 */ 369 void 370 hammer_cmd_mirror_write(char **av, int ac) 371 { 372 struct hammer_ioc_mirror_rw mirror; 373 const char *filesystem; 374 char *buf = malloc(SERIALBUF_SIZE); 375 struct hammer_ioc_pseudofs_rw pfs; 376 struct hammer_ioc_mrecord_head pickup; 377 struct hammer_ioc_synctid synctid; 378 union hammer_ioc_mrecord_any mrec_tmp; 379 hammer_ioc_mrecord_any_t mrec; 380 struct stat st; 381 int error; 382 int fd; 383 int n; 384 385 if (ac != 1) 386 mirror_usage(1); 387 filesystem = av[0]; 388 389 pickup.signature = 0; 390 pickup.type = 0; 391 392 again: 393 bzero(&mirror, sizeof(mirror)); 394 hammer_key_beg_init(&mirror.key_beg); 395 hammer_key_end_init(&mirror.key_end); 396 mirror.key_end = mirror.key_beg; 397 398 /* 399 * Read initial packet 400 */ 401 mrec = read_mrecord(0, &error, &pickup); 402 if (mrec == NULL) { 403 if (error == 0) 404 fprintf(stderr, "validate_mrec_header: short read\n"); 405 exit(1); 406 } 407 /* 408 * Validate packet 409 */ 410 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 411 return; 412 } 413 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 414 fprintf(stderr, "validate_mrec_header: did not get expected " 415 "PFSD record type\n"); 416 exit(1); 417 } 418 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 419 fprintf(stderr, "validate_mrec_header: unexpected payload " 420 "size\n"); 421 exit(1); 422 } 423 424 /* 425 * Create slave PFS if it doesn't yet exist 426 */ 427 if (lstat(filesystem, &st) != 0) { 428 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 429 } 430 free(mrec); 431 mrec = NULL; 432 433 fd = getpfs(&pfs, filesystem); 434 435 /* 436 * In two-way mode the target writes out a PFS packet first. 437 * The source uses our tid_end as its tid_beg by default, 438 * picking up where it left off. 439 */ 440 mirror.tid_beg = 0; 441 if (TwoWayPipeOpt) { 442 generate_mrec_header(fd, 1, pfs.pfs_id, 443 &mirror.tid_beg, &mirror.tid_end); 444 } 445 446 /* 447 * Read and process the PFS header. The source informs us of 448 * the TID range the stream represents. 449 */ 450 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 451 &mirror.tid_beg, &mirror.tid_end); 452 if (n < 0) { /* got TERM record */ 453 relpfs(fd, &pfs); 454 return; 455 } 456 457 mirror.ubuf = buf; 458 mirror.size = SERIALBUF_SIZE; 459 460 /* 461 * Read and process bulk records (REC, PASS, and SKIP types). 462 * 463 * On your life, do NOT mess with mirror.key_cur or your mirror 464 * target may become history. 465 */ 466 for (;;) { 467 mirror.count = 0; 468 mirror.pfs_id = pfs.pfs_id; 469 mirror.shared_uuid = pfs.ondisk->shared_uuid; 470 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 471 if (mirror.size <= 0) 472 break; 473 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 474 fprintf(stderr, "Mirror-write %s failed: %s\n", 475 filesystem, strerror(errno)); 476 exit(1); 477 } 478 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 479 fprintf(stderr, 480 "Mirror-write %s fatal error %d\n", 481 filesystem, mirror.head.error); 482 exit(1); 483 } 484 #if 0 485 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 486 fprintf(stderr, 487 "Mirror-write %s interrupted by timer at" 488 " %016llx\n", 489 filesystem, 490 mirror.key_cur.obj_id); 491 exit(0); 492 } 493 #endif 494 } 495 496 /* 497 * Read and process the termination sync record. 498 */ 499 mrec = read_mrecord(0, &error, &pickup); 500 501 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 502 fprintf(stderr, "Mirror-write: received termination request\n"); 503 free(mrec); 504 return; 505 } 506 507 if (mrec == NULL || 508 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 509 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 510 mrec->head.rec_size != sizeof(mrec->sync)) { 511 fprintf(stderr, "Mirror-write %s: Did not get termination " 512 "sync record, or rec_size is wrong rt=%d\n", 513 filesystem, mrec->head.type); 514 exit(1); 515 } 516 517 /* 518 * Update the PFS info on the target so the user has visibility 519 * into the new snapshot, and sync the target filesystem. 520 */ 521 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 522 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 523 524 bzero(&synctid, sizeof(synctid)); 525 synctid.op = HAMMER_SYNCTID_SYNC2; 526 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 527 528 if (VerboseOpt >= 2) { 529 fprintf(stderr, "Mirror-write %s: succeeded\n", 530 filesystem); 531 } 532 } 533 534 free(mrec); 535 mrec = NULL; 536 537 /* 538 * Report back to the originator. 539 */ 540 if (TwoWayPipeOpt) { 541 mrec_tmp.update.tid = mirror.tid_end; 542 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 543 &mrec_tmp, sizeof(mrec_tmp.update)); 544 } else { 545 printf("Source can update synctid to 0x%016llx\n", 546 mirror.tid_end); 547 } 548 relpfs(fd, &pfs); 549 goto again; 550 } 551 552 void 553 hammer_cmd_mirror_dump(void) 554 { 555 char *buf = malloc(SERIALBUF_SIZE); 556 struct hammer_ioc_mrecord_head pickup; 557 hammer_ioc_mrecord_any_t mrec; 558 int error; 559 int size; 560 int offset; 561 int bytes; 562 563 /* 564 * Read and process the PFS header 565 */ 566 pickup.signature = 0; 567 pickup.type = 0; 568 569 mrec = read_mrecord(0, &error, &pickup); 570 571 /* 572 * Read and process bulk records 573 */ 574 for (;;) { 575 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 576 if (size <= 0) 577 break; 578 offset = 0; 579 while (offset < size) { 580 mrec = (void *)((char *)buf + offset); 581 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 582 if (offset + bytes > size) { 583 fprintf(stderr, "Misaligned record\n"); 584 exit(1); 585 } 586 587 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 588 case HAMMER_MREC_TYPE_REC_BADCRC: 589 case HAMMER_MREC_TYPE_REC: 590 printf("Record obj=%016llx key=%016llx " 591 "rt=%02x ot=%02x", 592 mrec->rec.leaf.base.obj_id, 593 mrec->rec.leaf.base.key, 594 mrec->rec.leaf.base.rec_type, 595 mrec->rec.leaf.base.obj_type); 596 if (mrec->head.type == 597 HAMMER_MREC_TYPE_REC_BADCRC) { 598 printf(" (BAD CRC)"); 599 } 600 printf("\n"); 601 printf(" tids %016llx:%016llx data=%d\n", 602 mrec->rec.leaf.base.create_tid, 603 mrec->rec.leaf.base.delete_tid, 604 mrec->rec.leaf.data_len); 605 break; 606 case HAMMER_MREC_TYPE_PASS: 607 printf("Pass obj=%016llx key=%016llx " 608 "rt=%02x ot=%02x\n", 609 mrec->rec.leaf.base.obj_id, 610 mrec->rec.leaf.base.key, 611 mrec->rec.leaf.base.rec_type, 612 mrec->rec.leaf.base.obj_type); 613 printf(" tids %016llx:%016llx data=%d\n", 614 mrec->rec.leaf.base.create_tid, 615 mrec->rec.leaf.base.delete_tid, 616 mrec->rec.leaf.data_len); 617 break; 618 case HAMMER_MREC_TYPE_SKIP: 619 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 620 " obj=%016llx key=%016llx rt=%02x\n", 621 mrec->skip.skip_beg.obj_id, 622 mrec->skip.skip_beg.key, 623 mrec->skip.skip_beg.rec_type, 624 mrec->skip.skip_end.obj_id, 625 mrec->skip.skip_end.key, 626 mrec->skip.skip_end.rec_type); 627 default: 628 break; 629 } 630 offset += bytes; 631 } 632 } 633 634 /* 635 * Read and process the termination sync record. 636 */ 637 mrec = read_mrecord(0, &error, &pickup); 638 if (mrec == NULL || 639 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 640 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 641 ) { 642 fprintf(stderr, "Mirror-dump: Did not get termination " 643 "sync record\n"); 644 } 645 } 646 647 void 648 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 649 { 650 pid_t pid1; 651 pid_t pid2; 652 int fds[2]; 653 const char *xav[16]; 654 char tbuf[16]; 655 char *ptr; 656 int xac; 657 658 if (ac != 2) 659 mirror_usage(1); 660 661 if (pipe(fds) < 0) { 662 perror("pipe"); 663 exit(1); 664 } 665 666 TwoWayPipeOpt = 1; 667 668 /* 669 * Source 670 */ 671 if ((pid1 = fork()) == 0) { 672 dup2(fds[0], 0); 673 dup2(fds[0], 1); 674 close(fds[0]); 675 close(fds[1]); 676 if ((ptr = strchr(av[0], ':')) != NULL) { 677 *ptr++ = 0; 678 xac = 0; 679 xav[xac++] = "ssh"; 680 xav[xac++] = av[0]; 681 xav[xac++] = "hammer"; 682 683 switch(VerboseOpt) { 684 case 0: 685 break; 686 case 1: 687 xav[xac++] = "-v"; 688 break; 689 case 2: 690 xav[xac++] = "-vv"; 691 break; 692 default: 693 xav[xac++] = "-vvv"; 694 break; 695 } 696 if (ForceYesOpt) { 697 xav[xac++] = "-y"; 698 } 699 xav[xac++] = "-2"; 700 if (TimeoutOpt) { 701 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 702 xav[xac++] = "-t"; 703 xav[xac++] = tbuf; 704 } 705 if (streaming) 706 xav[xac++] = "mirror-read-stream"; 707 else 708 xav[xac++] = "mirror-read"; 709 xav[xac++] = ptr; 710 xav[xac++] = NULL; 711 execv("/usr/bin/ssh", (void *)xav); 712 } else { 713 hammer_cmd_mirror_read(av, 1, streaming); 714 fflush(stdout); 715 fflush(stderr); 716 } 717 _exit(1); 718 } 719 720 /* 721 * Target 722 */ 723 if ((pid2 = fork()) == 0) { 724 dup2(fds[1], 0); 725 dup2(fds[1], 1); 726 close(fds[0]); 727 close(fds[1]); 728 if ((ptr = strchr(av[1], ':')) != NULL) { 729 *ptr++ = 0; 730 xac = 0; 731 xav[xac++] = "ssh"; 732 xav[xac++] = av[1]; 733 xav[xac++] = "hammer"; 734 735 switch(VerboseOpt) { 736 case 0: 737 break; 738 case 1: 739 xav[xac++] = "-v"; 740 break; 741 case 2: 742 xav[xac++] = "-vv"; 743 break; 744 default: 745 xav[xac++] = "-vvv"; 746 break; 747 } 748 if (ForceYesOpt) { 749 xav[xac++] = "-y"; 750 } 751 xav[xac++] = "-2"; 752 xav[xac++] = "mirror-write"; 753 xav[xac++] = ptr; 754 xav[xac++] = NULL; 755 execv("/usr/bin/ssh", (void *)xav); 756 } else { 757 hammer_cmd_mirror_write(av + 1, 1); 758 fflush(stdout); 759 fflush(stderr); 760 } 761 _exit(1); 762 } 763 close(fds[0]); 764 close(fds[1]); 765 766 while (waitpid(pid1, NULL, 0) <= 0) 767 ; 768 while (waitpid(pid2, NULL, 0) <= 0) 769 ; 770 } 771 772 /* 773 * Read and return multiple mrecords 774 */ 775 static int 776 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 777 { 778 hammer_ioc_mrecord_any_t mrec; 779 u_int count; 780 size_t n; 781 size_t i; 782 size_t bytes; 783 int type; 784 785 count = 0; 786 while (size - count >= HAMMER_MREC_HEADSIZE) { 787 /* 788 * Cached the record header in case we run out of buffer 789 * space. 790 */ 791 fflush(stdout); 792 if (pickup->signature == 0) { 793 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 794 i = read(fd, (char *)pickup + n, 795 HAMMER_MREC_HEADSIZE - n); 796 if (i <= 0) 797 break; 798 } 799 if (n == 0) 800 break; 801 if (n != HAMMER_MREC_HEADSIZE) { 802 fprintf(stderr, "read_mrecords: short read on pipe\n"); 803 exit(1); 804 } 805 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 806 fprintf(stderr, "read_mrecords: malformed record on pipe, " 807 "bad signature\n"); 808 exit(1); 809 } 810 } 811 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 812 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 813 fprintf(stderr, "read_mrecords: malformed record on pipe, " 814 "illegal rec_size\n"); 815 exit(1); 816 } 817 818 /* 819 * Stop if we have insufficient space for the record and data. 820 */ 821 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 822 if (size - count < bytes) 823 break; 824 825 /* 826 * Stop if the record type is not a REC, SKIP, or PASS, 827 * which are the only types the ioctl supports. Other types 828 * are used only by the userland protocol. 829 * 830 * Ignore all flags. 831 */ 832 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 833 if (type != HAMMER_MREC_TYPE_PFSD && 834 type != HAMMER_MREC_TYPE_REC && 835 type != HAMMER_MREC_TYPE_SKIP && 836 type != HAMMER_MREC_TYPE_PASS) { 837 break; 838 } 839 840 /* 841 * Read the remainder and clear the pickup signature. 842 */ 843 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 844 i = read(fd, buf + count + n, bytes - n); 845 if (i <= 0) 846 break; 847 } 848 if (n != bytes) { 849 fprintf(stderr, "read_mrecords: short read on pipe\n"); 850 exit(1); 851 } 852 853 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 854 pickup->signature = 0; 855 pickup->type = 0; 856 mrec = (void *)(buf + count); 857 858 /* 859 * Validate the completed record 860 */ 861 if (mrec->head.rec_crc != 862 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 863 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 864 fprintf(stderr, "read_mrecords: malformed record " 865 "on pipe, bad crc\n"); 866 exit(1); 867 } 868 869 /* 870 * If its a B-Tree record validate the data crc. 871 * 872 * NOTE: If the VFS passes us an explicitly errorde mrec 873 * we just pass it through. 874 */ 875 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 876 877 if (type == HAMMER_MREC_TYPE_REC) { 878 if (mrec->head.rec_size < 879 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 880 fprintf(stderr, 881 "read_mrecords: malformed record on " 882 "pipe, illegal element data_len\n"); 883 exit(1); 884 } 885 if (mrec->rec.leaf.data_len && 886 mrec->rec.leaf.data_offset && 887 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 888 fprintf(stderr, 889 "read_mrecords: data_crc did not " 890 "match data! obj=%016llx key=%016llx\n", 891 mrec->rec.leaf.base.obj_id, 892 mrec->rec.leaf.base.key); 893 fprintf(stderr, 894 "continuing, but there are problems\n"); 895 } 896 } 897 count += bytes; 898 } 899 return(count); 900 } 901 902 /* 903 * Read and return a single mrecord. 904 */ 905 static 906 hammer_ioc_mrecord_any_t 907 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 908 { 909 hammer_ioc_mrecord_any_t mrec; 910 struct hammer_ioc_mrecord_head mrechd; 911 size_t bytes; 912 size_t n; 913 size_t i; 914 915 if (pickup && pickup->type != 0) { 916 mrechd = *pickup; 917 pickup->signature = 0; 918 pickup->type = 0; 919 n = HAMMER_MREC_HEADSIZE; 920 } else { 921 /* 922 * Read in the PFSD header from the sender. 923 */ 924 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 925 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 926 if (i <= 0) 927 break; 928 } 929 if (n == 0) { 930 *errorp = 0; /* EOF */ 931 return(NULL); 932 } 933 if (n != HAMMER_MREC_HEADSIZE) { 934 fprintf(stderr, "short read of mrecord header\n"); 935 *errorp = EPIPE; 936 return(NULL); 937 } 938 } 939 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 940 fprintf(stderr, "read_mrecord: bad signature\n"); 941 *errorp = EINVAL; 942 return(NULL); 943 } 944 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 945 assert(bytes >= sizeof(mrechd)); 946 mrec = malloc(bytes); 947 mrec->head = mrechd; 948 949 while (n < bytes) { 950 i = read(fdin, (char *)mrec + n, bytes - n); 951 if (i <= 0) 952 break; 953 n += i; 954 } 955 if (n != bytes) { 956 fprintf(stderr, "read_mrecord: short read on payload\n"); 957 *errorp = EPIPE; 958 return(NULL); 959 } 960 if (mrec->head.rec_crc != 961 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 962 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 963 fprintf(stderr, "read_mrecord: bad CRC\n"); 964 *errorp = EINVAL; 965 return(NULL); 966 } 967 *errorp = 0; 968 return(mrec); 969 } 970 971 static 972 void 973 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 974 int bytes) 975 { 976 char zbuf[HAMMER_HEAD_ALIGN]; 977 int pad; 978 979 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 980 981 assert(bytes >= (int)sizeof(mrec->head)); 982 bzero(&mrec->head, sizeof(mrec->head)); 983 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 984 mrec->head.type = type; 985 mrec->head.rec_size = bytes; 986 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 987 bytes - HAMMER_MREC_CRCOFF); 988 if (write(fdout, mrec, bytes) != bytes) { 989 fprintf(stderr, "write_mrecord: error %d (%s)\n", 990 errno, strerror(errno)); 991 exit(1); 992 } 993 if (pad) { 994 bzero(zbuf, pad); 995 if (write(fdout, zbuf, pad) != pad) { 996 fprintf(stderr, "write_mrecord: error %d (%s)\n", 997 errno, strerror(errno)); 998 exit(1); 999 } 1000 } 1001 } 1002 1003 /* 1004 * Generate a mirroring header with the pfs information of the 1005 * originating filesytem. 1006 */ 1007 static void 1008 generate_mrec_header(int fd, int fdout, int pfs_id, 1009 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1010 { 1011 struct hammer_ioc_pseudofs_rw pfs; 1012 union hammer_ioc_mrecord_any mrec_tmp; 1013 1014 bzero(&pfs, sizeof(pfs)); 1015 bzero(&mrec_tmp, sizeof(mrec_tmp)); 1016 pfs.pfs_id = pfs_id; 1017 pfs.ondisk = &mrec_tmp.pfs.pfsd; 1018 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 1019 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1020 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1021 exit(1); 1022 } 1023 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1024 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1025 exit(1); 1026 } 1027 1028 /* 1029 * sync_beg_tid - lowest TID on source after which a full history 1030 * is available. 1031 * 1032 * sync_end_tid - highest fully synchronized TID from source. 1033 */ 1034 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 1035 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 1036 if (tid_endp) 1037 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 1038 mrec_tmp.pfs.version = pfs.version; 1039 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 1040 &mrec_tmp, sizeof(mrec_tmp.pfs)); 1041 } 1042 1043 /* 1044 * Validate the pfs information from the originating filesystem 1045 * against the target filesystem. shared_uuid must match. 1046 * 1047 * return -1 if we got a TERM record 1048 */ 1049 static int 1050 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1051 struct hammer_ioc_mrecord_head *pickup, 1052 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1053 { 1054 struct hammer_ioc_pseudofs_rw pfs; 1055 struct hammer_pseudofs_data pfsd; 1056 hammer_ioc_mrecord_any_t mrec; 1057 int error; 1058 1059 /* 1060 * Get the PFSD info from the target filesystem. 1061 */ 1062 bzero(&pfs, sizeof(pfs)); 1063 bzero(&pfsd, sizeof(pfsd)); 1064 pfs.pfs_id = pfs_id; 1065 pfs.ondisk = &pfsd; 1066 pfs.bytes = sizeof(pfsd); 1067 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1068 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1069 exit(1); 1070 } 1071 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1072 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1073 exit(1); 1074 } 1075 1076 mrec = read_mrecord(fdin, &error, pickup); 1077 if (mrec == NULL) { 1078 if (error == 0) 1079 fprintf(stderr, "validate_mrec_header: short read\n"); 1080 exit(1); 1081 } 1082 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1083 free(mrec); 1084 return(-1); 1085 } 1086 1087 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1088 fprintf(stderr, "validate_mrec_header: did not get expected " 1089 "PFSD record type\n"); 1090 exit(1); 1091 } 1092 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1093 fprintf(stderr, "validate_mrec_header: unexpected payload " 1094 "size\n"); 1095 exit(1); 1096 } 1097 if (mrec->pfs.version != pfs.version) { 1098 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1099 exit(1); 1100 } 1101 1102 /* 1103 * Whew. Ok, is the read PFS info compatible with the target? 1104 */ 1105 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1106 sizeof(pfsd.shared_uuid)) != 0) { 1107 fprintf(stderr, 1108 "mirror-write: source and target have " 1109 "different shared-uuid's!\n"); 1110 exit(1); 1111 } 1112 if (is_target && 1113 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1114 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1115 exit(1); 1116 } 1117 if (tid_begp) 1118 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1119 if (tid_endp) 1120 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1121 free(mrec); 1122 return(0); 1123 } 1124 1125 static void 1126 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1127 { 1128 struct hammer_ioc_pseudofs_rw pfs; 1129 struct hammer_pseudofs_data pfsd; 1130 1131 bzero(&pfs, sizeof(pfs)); 1132 bzero(&pfsd, sizeof(pfsd)); 1133 pfs.pfs_id = pfs_id; 1134 pfs.ondisk = &pfsd; 1135 pfs.bytes = sizeof(pfsd); 1136 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1137 perror("update_pfs_snapshot (read)"); 1138 exit(1); 1139 } 1140 if (pfsd.sync_end_tid != snapshot_tid) { 1141 pfsd.sync_end_tid = snapshot_tid; 1142 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1143 perror("update_pfs_snapshot (rewrite)"); 1144 exit(1); 1145 } 1146 if (VerboseOpt >= 2) { 1147 fprintf(stderr, 1148 "Mirror-write: Completed, updated snapshot " 1149 "to %016llx\n", 1150 snapshot_tid); 1151 } 1152 } 1153 } 1154 1155 /* 1156 * Bandwidth-limited write in chunks 1157 */ 1158 static 1159 ssize_t 1160 writebw(int fd, const void *buf, size_t nbytes, 1161 u_int64_t *bwcount, struct timeval *tv1) 1162 { 1163 struct timeval tv2; 1164 size_t n; 1165 ssize_t r; 1166 ssize_t a; 1167 int usec; 1168 1169 a = 0; 1170 r = 0; 1171 while (nbytes) { 1172 if (*bwcount + nbytes > BandwidthOpt) 1173 n = BandwidthOpt - *bwcount; 1174 else 1175 n = nbytes; 1176 if (n) 1177 r = write(fd, buf, n); 1178 if (r >= 0) { 1179 a += r; 1180 nbytes -= r; 1181 buf = (const char *)buf + r; 1182 } 1183 if ((size_t)r != n) 1184 break; 1185 *bwcount += n; 1186 if (*bwcount >= BandwidthOpt) { 1187 gettimeofday(&tv2, NULL); 1188 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1189 (int)(tv2.tv_usec - tv1->tv_usec); 1190 if (usec >= 0 && usec < 1000000) 1191 usleep(1000000 - usec); 1192 gettimeofday(tv1, NULL); 1193 *bwcount -= BandwidthOpt; 1194 } 1195 } 1196 return(a ? a : r); 1197 } 1198 1199 /* 1200 * Get a yes or no answer from the terminal. The program may be run as 1201 * part of a two-way pipe so we cannot use stdin for this operation. 1202 */ 1203 static int 1204 getyn(void) 1205 { 1206 char buf[256]; 1207 FILE *fp; 1208 int result; 1209 1210 fp = fopen("/dev/tty", "r"); 1211 if (fp == NULL) { 1212 fprintf(stderr, "No terminal for response\n"); 1213 return(-1); 1214 } 1215 result = -1; 1216 while (fgets(buf, sizeof(buf), fp) != NULL) { 1217 if (buf[0] == 'y' || buf[0] == 'Y') { 1218 result = 1; 1219 break; 1220 } 1221 if (buf[0] == 'n' || buf[0] == 'N') { 1222 result = 0; 1223 break; 1224 } 1225 fprintf(stderr, "Response not understood\n"); 1226 break; 1227 } 1228 fclose(fp); 1229 return(result); 1230 } 1231 1232 static void 1233 mirror_usage(int code) 1234 { 1235 fprintf(stderr, 1236 "hammer mirror-read <filesystem> [begin-tid]\n" 1237 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1238 "hammer mirror-write <filesystem>\n" 1239 "hammer mirror-dump\n" 1240 "hammer mirror-copy [[user@]host:]<filesystem>" 1241 " [[user@]host:]<filesystem>\n" 1242 "hammer mirror-stream [[user@]host:]<filesystem>" 1243 " [[user@]host:]<filesystem>\n" 1244 ); 1245 exit(code); 1246 } 1247 1248