1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static int getyn(void); 56 static void mirror_usage(int code); 57 58 /* 59 * Generate a mirroring data stream from the specific source over the 60 * entire key range, but restricted to the specified transaction range. 61 * 62 * The HAMMER VFS does most of the work, we add a few new mrecord 63 * types to negotiate the TID ranges and verify that the entire 64 * stream made it to the destination. 65 */ 66 void 67 hammer_cmd_mirror_read(char **av, int ac, int streaming) 68 { 69 struct hammer_ioc_mirror_rw mirror; 70 struct hammer_ioc_pseudofs_rw pfs; 71 union hammer_ioc_mrecord_any mrec_tmp; 72 struct hammer_ioc_mrecord_head pickup; 73 hammer_ioc_mrecord_any_t mrec; 74 hammer_tid_t sync_tid; 75 const char *filesystem; 76 char *buf = malloc(SERIALBUF_SIZE); 77 int interrupted = 0; 78 int error; 79 int fd; 80 int n; 81 int didwork; 82 int64_t total_bytes; 83 time_t base_t = time(NULL); 84 struct timeval bwtv; 85 u_int64_t bwcount; 86 87 if (ac == 0 || ac > 2) 88 mirror_usage(1); 89 filesystem = av[0]; 90 91 pickup.signature = 0; 92 pickup.type = 0; 93 94 again: 95 bzero(&mirror, sizeof(mirror)); 96 hammer_key_beg_init(&mirror.key_beg); 97 hammer_key_end_init(&mirror.key_end); 98 99 fd = getpfs(&pfs, filesystem); 100 101 if (streaming && VerboseOpt) { 102 fprintf(stderr, "\nRunning"); 103 fflush(stderr); 104 } 105 total_bytes = 0; 106 gettimeofday(&bwtv, NULL); 107 bwcount = 0; 108 109 /* 110 * Send initial header for the purpose of determining shared-uuid. 111 */ 112 generate_mrec_header(fd, 1, pfs.pfs_id, NULL, NULL); 113 114 /* 115 * In 2-way mode the target will send us a PFS info packet 116 * first. Use the target's current snapshot TID as our default 117 * begin TID. 118 */ 119 mirror.tid_beg = 0; 120 if (TwoWayPipeOpt) { 121 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 122 NULL, &mirror.tid_beg); 123 if (n < 0) { /* got TERM record */ 124 relpfs(fd, &pfs); 125 return; 126 } 127 ++mirror.tid_beg; 128 } 129 130 /* 131 * Write out the PFS header, tid_beg will be updated if our PFS 132 * has a larger begin sync. tid_end is set to the latest source 133 * TID whos flush cycle has completed. 134 */ 135 generate_mrec_header(fd, 1, pfs.pfs_id, 136 &mirror.tid_beg, &mirror.tid_end); 137 138 /* XXX streaming mode support w/ cycle or command line arg */ 139 /* 140 * A cycle file overrides the beginning TID 141 */ 142 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 143 144 if (ac == 2) 145 mirror.tid_beg = strtoull(av[1], NULL, 0); 146 147 if (streaming == 0 || VerboseOpt >= 2) { 148 fprintf(stderr, 149 "Mirror-read: Mirror from %016llx to %016llx\n", 150 mirror.tid_beg, mirror.tid_end); 151 } 152 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 153 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 154 mirror.key_beg.obj_id); 155 } 156 157 /* 158 * Nothing to do if begin equals end. 159 */ 160 if (mirror.tid_beg >= mirror.tid_end) { 161 if (streaming == 0 || VerboseOpt >= 2) 162 fprintf(stderr, "Mirror-read: No work to do\n"); 163 didwork = 0; 164 goto done; 165 } 166 didwork = 1; 167 168 /* 169 * Write out bulk records 170 */ 171 mirror.ubuf = buf; 172 mirror.size = SERIALBUF_SIZE; 173 174 do { 175 mirror.count = 0; 176 mirror.pfs_id = pfs.pfs_id; 177 mirror.shared_uuid = pfs.ondisk->shared_uuid; 178 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 179 fprintf(stderr, "Mirror-read %s failed: %s\n", 180 filesystem, strerror(errno)); 181 exit(1); 182 } 183 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 184 fprintf(stderr, 185 "Mirror-read %s fatal error %d\n", 186 filesystem, mirror.head.error); 187 exit(1); 188 } 189 if (mirror.count) { 190 if (BandwidthOpt) { 191 n = writebw(1, mirror.ubuf, mirror.count, 192 &bwcount, &bwtv); 193 } else { 194 n = write(1, mirror.ubuf, mirror.count); 195 } 196 if (n != mirror.count) { 197 fprintf(stderr, "Mirror-read %s failed: " 198 "short write\n", 199 filesystem); 200 exit(1); 201 } 202 } 203 total_bytes += mirror.count; 204 if (streaming && VerboseOpt) { 205 fprintf(stderr, "\r%016llx %11lld", 206 mirror.key_cur.obj_id, 207 total_bytes); 208 fflush(stderr); 209 } 210 mirror.key_beg = mirror.key_cur; 211 if (TimeoutOpt && 212 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 213 fprintf(stderr, 214 "Mirror-read %s interrupted by timer at" 215 " %016llx\n", 216 filesystem, 217 mirror.key_cur.obj_id); 218 interrupted = 1; 219 break; 220 } 221 } while (mirror.count != 0); 222 223 done: 224 /* 225 * Write out the termination sync record - only if not interrupted 226 */ 227 if (interrupted == 0) { 228 if (didwork) { 229 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 230 &mrec_tmp, sizeof(mrec_tmp.sync)); 231 } else { 232 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 233 &mrec_tmp, sizeof(mrec_tmp.sync)); 234 } 235 } 236 237 /* 238 * If the -2 option was given (automatic when doing mirror-copy), 239 * a two-way pipe is assumed and we expect a response mrec from 240 * the target. 241 */ 242 if (TwoWayPipeOpt) { 243 mrec = read_mrecord(0, &error, &pickup); 244 if (mrec == NULL || 245 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 246 mrec->head.rec_size != sizeof(mrec->update)) { 247 fprintf(stderr, "mirror_read: Did not get final " 248 "acknowledgement packet from target\n"); 249 exit(1); 250 } 251 if (interrupted) { 252 if (CyclePath) { 253 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 254 fprintf(stderr, "Cyclefile %s updated for " 255 "continuation\n", CyclePath); 256 } 257 } else { 258 sync_tid = mrec->update.tid; 259 if (CyclePath) { 260 hammer_key_beg_init(&mirror.key_beg); 261 hammer_set_cycle(&mirror.key_beg, sync_tid); 262 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 263 CyclePath, sync_tid); 264 } 265 } 266 } else if (CyclePath) { 267 /* NOTE! mirror.tid_beg cannot be updated */ 268 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 269 "fully updated unless you use mirror-copy\n"); 270 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 271 } 272 if (streaming && interrupted == 0) { 273 time_t t1 = time(NULL); 274 time_t t2; 275 276 if (VerboseOpt) { 277 fprintf(stderr, " W"); 278 fflush(stderr); 279 } 280 pfs.ondisk->sync_end_tid = mirror.tid_end; 281 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 282 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 283 filesystem, strerror(errno)); 284 } else { 285 t2 = time(NULL) - t1; 286 if (t2 >= 0 && t2 < DelayOpt) { 287 if (VerboseOpt) { 288 fprintf(stderr, "\bD"); 289 fflush(stderr); 290 } 291 sleep(DelayOpt - t2); 292 } 293 if (VerboseOpt) { 294 fprintf(stderr, "\b "); 295 fflush(stderr); 296 } 297 relpfs(fd, &pfs); 298 goto again; 299 } 300 } 301 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 302 &mrec_tmp, sizeof(mrec_tmp.sync)); 303 relpfs(fd, &pfs); 304 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 305 } 306 307 static void 308 create_pfs(const char *filesystem, uuid_t *s_uuid) 309 { 310 if (ForceYesOpt == 1) 311 { 312 fprintf(stderr, "PFS slave %s does not exist. " 313 "Auto create new slave PFS!\n", filesystem); 314 315 } 316 else 317 { 318 fprintf(stderr, "PFS slave %s does not exist.\n" 319 "Do you want to create a new slave PFS? (yes|no) ", 320 filesystem); 321 fflush(stderr); 322 if (getyn() != 1) { 323 fprintf(stderr, "Aborting operation\n"); 324 exit(1); 325 } 326 } 327 328 u_int32_t status; 329 char *shared_uuid = NULL; 330 uuid_to_string(s_uuid, &shared_uuid, &status); 331 332 char *cmd = NULL; 333 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 334 filesystem, shared_uuid); 335 free(shared_uuid); 336 337 if (cmd == NULL) { 338 fprintf(stderr, "Failed to alloc memory\n"); 339 exit(1); 340 } 341 if (system(cmd) != 0) { 342 fprintf(stderr, "Failed to create PFS\n"); 343 } 344 free(cmd); 345 } 346 347 /* 348 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 349 * some additional packet types to negotiate TID ranges and to verify 350 * completion. The HAMMER VFS does most of the work. 351 * 352 * It is important to note that the mirror.key_{beg,end} range must 353 * match the ranged used by the original. For now both sides use 354 * range the entire key space. 355 * 356 * It is even more important that the records in the stream conform 357 * to the TID range also supplied in the stream. The HAMMER VFS will 358 * use the REC, PASS, and SKIP record types to track the portions of 359 * the B-Tree being scanned in order to be able to proactively delete 360 * records on the target within those active areas that are not mentioned 361 * by the source. 362 * 363 * The mirror.key_cur field is used by the VFS to do this tracking. It 364 * must be initialized to key_beg but then is persistently updated by 365 * the HAMMER VFS on each successive ioctl() call. If you blow up this 366 * field you will blow up the mirror target, possibly to the point of 367 * deleting everything. As a safety measure the HAMMER VFS simply marks 368 * the records that the source has destroyed as deleted on the target, 369 * and normal pruning operations will deal with their final disposition 370 * at some later time. 371 */ 372 void 373 hammer_cmd_mirror_write(char **av, int ac) 374 { 375 struct hammer_ioc_mirror_rw mirror; 376 const char *filesystem; 377 char *buf = malloc(SERIALBUF_SIZE); 378 struct hammer_ioc_pseudofs_rw pfs; 379 struct hammer_ioc_mrecord_head pickup; 380 struct hammer_ioc_synctid synctid; 381 union hammer_ioc_mrecord_any mrec_tmp; 382 hammer_ioc_mrecord_any_t mrec; 383 struct stat st; 384 int error; 385 int fd; 386 int n; 387 388 if (ac != 1) 389 mirror_usage(1); 390 filesystem = av[0]; 391 392 pickup.signature = 0; 393 pickup.type = 0; 394 395 again: 396 bzero(&mirror, sizeof(mirror)); 397 hammer_key_beg_init(&mirror.key_beg); 398 hammer_key_end_init(&mirror.key_end); 399 mirror.key_end = mirror.key_beg; 400 401 /* 402 * Read initial packet 403 */ 404 mrec = read_mrecord(0, &error, &pickup); 405 if (mrec == NULL) { 406 if (error == 0) 407 fprintf(stderr, "validate_mrec_header: short read\n"); 408 exit(1); 409 } 410 /* 411 * Validate packet 412 */ 413 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 414 return; 415 } 416 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 417 fprintf(stderr, "validate_mrec_header: did not get expected " 418 "PFSD record type\n"); 419 exit(1); 420 } 421 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 422 fprintf(stderr, "validate_mrec_header: unexpected payload " 423 "size\n"); 424 exit(1); 425 } 426 427 /* 428 * Create slave PFS if it doesn't yet exist 429 */ 430 if (lstat(filesystem, &st) != 0) { 431 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 432 } 433 free(mrec); 434 mrec = NULL; 435 436 fd = getpfs(&pfs, filesystem); 437 438 /* 439 * In two-way mode the target writes out a PFS packet first. 440 * The source uses our tid_end as its tid_beg by default, 441 * picking up where it left off. 442 */ 443 mirror.tid_beg = 0; 444 if (TwoWayPipeOpt) { 445 generate_mrec_header(fd, 1, pfs.pfs_id, 446 &mirror.tid_beg, &mirror.tid_end); 447 } 448 449 /* 450 * Read and process the PFS header. The source informs us of 451 * the TID range the stream represents. 452 */ 453 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 454 &mirror.tid_beg, &mirror.tid_end); 455 if (n < 0) { /* got TERM record */ 456 relpfs(fd, &pfs); 457 return; 458 } 459 460 mirror.ubuf = buf; 461 mirror.size = SERIALBUF_SIZE; 462 463 /* 464 * Read and process bulk records (REC, PASS, and SKIP types). 465 * 466 * On your life, do NOT mess with mirror.key_cur or your mirror 467 * target may become history. 468 */ 469 for (;;) { 470 mirror.count = 0; 471 mirror.pfs_id = pfs.pfs_id; 472 mirror.shared_uuid = pfs.ondisk->shared_uuid; 473 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 474 if (mirror.size <= 0) 475 break; 476 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 477 fprintf(stderr, "Mirror-write %s failed: %s\n", 478 filesystem, strerror(errno)); 479 exit(1); 480 } 481 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 482 fprintf(stderr, 483 "Mirror-write %s fatal error %d\n", 484 filesystem, mirror.head.error); 485 exit(1); 486 } 487 #if 0 488 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 489 fprintf(stderr, 490 "Mirror-write %s interrupted by timer at" 491 " %016llx\n", 492 filesystem, 493 mirror.key_cur.obj_id); 494 exit(0); 495 } 496 #endif 497 } 498 499 /* 500 * Read and process the termination sync record. 501 */ 502 mrec = read_mrecord(0, &error, &pickup); 503 504 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 505 fprintf(stderr, "Mirror-write: received termination request\n"); 506 free(mrec); 507 return; 508 } 509 510 if (mrec == NULL || 511 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 512 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 513 mrec->head.rec_size != sizeof(mrec->sync)) { 514 fprintf(stderr, "Mirror-write %s: Did not get termination " 515 "sync record, or rec_size is wrong rt=%d\n", 516 filesystem, mrec->head.type); 517 exit(1); 518 } 519 520 /* 521 * Update the PFS info on the target so the user has visibility 522 * into the new snapshot, and sync the target filesystem. 523 */ 524 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 525 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 526 527 bzero(&synctid, sizeof(synctid)); 528 synctid.op = HAMMER_SYNCTID_SYNC2; 529 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 530 531 if (VerboseOpt >= 2) { 532 fprintf(stderr, "Mirror-write %s: succeeded\n", 533 filesystem); 534 } 535 } 536 537 free(mrec); 538 mrec = NULL; 539 540 /* 541 * Report back to the originator. 542 */ 543 if (TwoWayPipeOpt) { 544 mrec_tmp.update.tid = mirror.tid_end; 545 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 546 &mrec_tmp, sizeof(mrec_tmp.update)); 547 } else { 548 printf("Source can update synctid to 0x%016llx\n", 549 mirror.tid_end); 550 } 551 relpfs(fd, &pfs); 552 goto again; 553 } 554 555 void 556 hammer_cmd_mirror_dump(void) 557 { 558 char *buf = malloc(SERIALBUF_SIZE); 559 struct hammer_ioc_mrecord_head pickup; 560 hammer_ioc_mrecord_any_t mrec; 561 int error; 562 int size; 563 int offset; 564 int bytes; 565 566 /* 567 * Read and process the PFS header 568 */ 569 pickup.signature = 0; 570 pickup.type = 0; 571 572 mrec = read_mrecord(0, &error, &pickup); 573 574 /* 575 * Read and process bulk records 576 */ 577 for (;;) { 578 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 579 if (size <= 0) 580 break; 581 offset = 0; 582 while (offset < size) { 583 mrec = (void *)((char *)buf + offset); 584 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 585 if (offset + bytes > size) { 586 fprintf(stderr, "Misaligned record\n"); 587 exit(1); 588 } 589 590 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 591 case HAMMER_MREC_TYPE_REC_BADCRC: 592 case HAMMER_MREC_TYPE_REC: 593 printf("Record obj=%016llx key=%016llx " 594 "rt=%02x ot=%02x", 595 mrec->rec.leaf.base.obj_id, 596 mrec->rec.leaf.base.key, 597 mrec->rec.leaf.base.rec_type, 598 mrec->rec.leaf.base.obj_type); 599 if (mrec->head.type == 600 HAMMER_MREC_TYPE_REC_BADCRC) { 601 printf(" (BAD CRC)"); 602 } 603 printf("\n"); 604 printf(" tids %016llx:%016llx data=%d\n", 605 mrec->rec.leaf.base.create_tid, 606 mrec->rec.leaf.base.delete_tid, 607 mrec->rec.leaf.data_len); 608 break; 609 case HAMMER_MREC_TYPE_PASS: 610 printf("Pass obj=%016llx key=%016llx " 611 "rt=%02x ot=%02x\n", 612 mrec->rec.leaf.base.obj_id, 613 mrec->rec.leaf.base.key, 614 mrec->rec.leaf.base.rec_type, 615 mrec->rec.leaf.base.obj_type); 616 printf(" tids %016llx:%016llx data=%d\n", 617 mrec->rec.leaf.base.create_tid, 618 mrec->rec.leaf.base.delete_tid, 619 mrec->rec.leaf.data_len); 620 break; 621 case HAMMER_MREC_TYPE_SKIP: 622 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 623 " obj=%016llx key=%016llx rt=%02x\n", 624 mrec->skip.skip_beg.obj_id, 625 mrec->skip.skip_beg.key, 626 mrec->skip.skip_beg.rec_type, 627 mrec->skip.skip_end.obj_id, 628 mrec->skip.skip_end.key, 629 mrec->skip.skip_end.rec_type); 630 default: 631 break; 632 } 633 offset += bytes; 634 } 635 } 636 637 /* 638 * Read and process the termination sync record. 639 */ 640 mrec = read_mrecord(0, &error, &pickup); 641 if (mrec == NULL || 642 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 643 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 644 ) { 645 fprintf(stderr, "Mirror-dump: Did not get termination " 646 "sync record\n"); 647 } 648 } 649 650 void 651 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 652 { 653 pid_t pid1; 654 pid_t pid2; 655 int fds[2]; 656 const char *xav[16]; 657 char tbuf[16]; 658 char *ptr; 659 int xac; 660 661 if (ac != 2) 662 mirror_usage(1); 663 664 if (pipe(fds) < 0) { 665 perror("pipe"); 666 exit(1); 667 } 668 669 TwoWayPipeOpt = 1; 670 671 /* 672 * Source 673 */ 674 if ((pid1 = fork()) == 0) { 675 dup2(fds[0], 0); 676 dup2(fds[0], 1); 677 close(fds[0]); 678 close(fds[1]); 679 if ((ptr = strchr(av[0], ':')) != NULL) { 680 *ptr++ = 0; 681 xac = 0; 682 xav[xac++] = "ssh"; 683 xav[xac++] = av[0]; 684 xav[xac++] = "hammer"; 685 686 switch(VerboseOpt) { 687 case 0: 688 break; 689 case 1: 690 xav[xac++] = "-v"; 691 break; 692 case 2: 693 xav[xac++] = "-vv"; 694 break; 695 default: 696 xav[xac++] = "-vvv"; 697 break; 698 } 699 if (ForceYesOpt) { 700 xav[xac++] = "-y"; 701 } 702 xav[xac++] = "-2"; 703 if (TimeoutOpt) { 704 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 705 xav[xac++] = "-t"; 706 xav[xac++] = tbuf; 707 } 708 if (streaming) 709 xav[xac++] = "mirror-read-stream"; 710 else 711 xav[xac++] = "mirror-read"; 712 xav[xac++] = ptr; 713 xav[xac++] = NULL; 714 execv("/usr/bin/ssh", (void *)xav); 715 } else { 716 hammer_cmd_mirror_read(av, 1, streaming); 717 fflush(stdout); 718 fflush(stderr); 719 } 720 _exit(1); 721 } 722 723 /* 724 * Target 725 */ 726 if ((pid2 = fork()) == 0) { 727 dup2(fds[1], 0); 728 dup2(fds[1], 1); 729 close(fds[0]); 730 close(fds[1]); 731 if ((ptr = strchr(av[1], ':')) != NULL) { 732 *ptr++ = 0; 733 xac = 0; 734 xav[xac++] = "ssh"; 735 xav[xac++] = av[1]; 736 xav[xac++] = "hammer"; 737 738 switch(VerboseOpt) { 739 case 0: 740 break; 741 case 1: 742 xav[xac++] = "-v"; 743 break; 744 case 2: 745 xav[xac++] = "-vv"; 746 break; 747 default: 748 xav[xac++] = "-vvv"; 749 break; 750 } 751 if (ForceYesOpt) { 752 xav[xac++] = "-y"; 753 } 754 xav[xac++] = "-2"; 755 xav[xac++] = "mirror-write"; 756 xav[xac++] = ptr; 757 xav[xac++] = NULL; 758 execv("/usr/bin/ssh", (void *)xav); 759 } else { 760 hammer_cmd_mirror_write(av + 1, 1); 761 fflush(stdout); 762 fflush(stderr); 763 } 764 _exit(1); 765 } 766 close(fds[0]); 767 close(fds[1]); 768 769 while (waitpid(pid1, NULL, 0) <= 0) 770 ; 771 while (waitpid(pid2, NULL, 0) <= 0) 772 ; 773 } 774 775 /* 776 * Read and return multiple mrecords 777 */ 778 static int 779 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 780 { 781 hammer_ioc_mrecord_any_t mrec; 782 u_int count; 783 size_t n; 784 size_t i; 785 size_t bytes; 786 int type; 787 788 count = 0; 789 while (size - count >= HAMMER_MREC_HEADSIZE) { 790 /* 791 * Cached the record header in case we run out of buffer 792 * space. 793 */ 794 fflush(stdout); 795 if (pickup->signature == 0) { 796 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 797 i = read(fd, (char *)pickup + n, 798 HAMMER_MREC_HEADSIZE - n); 799 if (i <= 0) 800 break; 801 } 802 if (n == 0) 803 break; 804 if (n != HAMMER_MREC_HEADSIZE) { 805 fprintf(stderr, "read_mrecords: short read on pipe\n"); 806 exit(1); 807 } 808 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 809 fprintf(stderr, "read_mrecords: malformed record on pipe, " 810 "bad signature\n"); 811 exit(1); 812 } 813 } 814 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 815 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 816 fprintf(stderr, "read_mrecords: malformed record on pipe, " 817 "illegal rec_size\n"); 818 exit(1); 819 } 820 821 /* 822 * Stop if we have insufficient space for the record and data. 823 */ 824 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 825 if (size - count < bytes) 826 break; 827 828 /* 829 * Stop if the record type is not a REC, SKIP, or PASS, 830 * which are the only types the ioctl supports. Other types 831 * are used only by the userland protocol. 832 * 833 * Ignore all flags. 834 */ 835 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 836 if (type != HAMMER_MREC_TYPE_PFSD && 837 type != HAMMER_MREC_TYPE_REC && 838 type != HAMMER_MREC_TYPE_SKIP && 839 type != HAMMER_MREC_TYPE_PASS) { 840 break; 841 } 842 843 /* 844 * Read the remainder and clear the pickup signature. 845 */ 846 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 847 i = read(fd, buf + count + n, bytes - n); 848 if (i <= 0) 849 break; 850 } 851 if (n != bytes) { 852 fprintf(stderr, "read_mrecords: short read on pipe\n"); 853 exit(1); 854 } 855 856 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 857 pickup->signature = 0; 858 pickup->type = 0; 859 mrec = (void *)(buf + count); 860 861 /* 862 * Validate the completed record 863 */ 864 if (mrec->head.rec_crc != 865 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 866 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 867 fprintf(stderr, "read_mrecords: malformed record " 868 "on pipe, bad crc\n"); 869 exit(1); 870 } 871 872 /* 873 * If its a B-Tree record validate the data crc. 874 * 875 * NOTE: If the VFS passes us an explicitly errorde mrec 876 * we just pass it through. 877 */ 878 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 879 880 if (type == HAMMER_MREC_TYPE_REC) { 881 if (mrec->head.rec_size < 882 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 883 fprintf(stderr, 884 "read_mrecords: malformed record on " 885 "pipe, illegal element data_len\n"); 886 exit(1); 887 } 888 if (mrec->rec.leaf.data_len && 889 mrec->rec.leaf.data_offset && 890 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 891 fprintf(stderr, 892 "read_mrecords: data_crc did not " 893 "match data! obj=%016llx key=%016llx\n", 894 mrec->rec.leaf.base.obj_id, 895 mrec->rec.leaf.base.key); 896 fprintf(stderr, 897 "continuing, but there are problems\n"); 898 } 899 } 900 count += bytes; 901 } 902 return(count); 903 } 904 905 /* 906 * Read and return a single mrecord. 907 */ 908 static 909 hammer_ioc_mrecord_any_t 910 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 911 { 912 hammer_ioc_mrecord_any_t mrec; 913 struct hammer_ioc_mrecord_head mrechd; 914 size_t bytes; 915 size_t n; 916 size_t i; 917 918 if (pickup && pickup->type != 0) { 919 mrechd = *pickup; 920 pickup->signature = 0; 921 pickup->type = 0; 922 n = HAMMER_MREC_HEADSIZE; 923 } else { 924 /* 925 * Read in the PFSD header from the sender. 926 */ 927 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 928 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 929 if (i <= 0) 930 break; 931 } 932 if (n == 0) { 933 *errorp = 0; /* EOF */ 934 return(NULL); 935 } 936 if (n != HAMMER_MREC_HEADSIZE) { 937 fprintf(stderr, "short read of mrecord header\n"); 938 *errorp = EPIPE; 939 return(NULL); 940 } 941 } 942 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 943 fprintf(stderr, "read_mrecord: bad signature\n"); 944 *errorp = EINVAL; 945 return(NULL); 946 } 947 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 948 assert(bytes >= sizeof(mrechd)); 949 mrec = malloc(bytes); 950 mrec->head = mrechd; 951 952 while (n < bytes) { 953 i = read(fdin, (char *)mrec + n, bytes - n); 954 if (i <= 0) 955 break; 956 n += i; 957 } 958 if (n != bytes) { 959 fprintf(stderr, "read_mrecord: short read on payload\n"); 960 *errorp = EPIPE; 961 return(NULL); 962 } 963 if (mrec->head.rec_crc != 964 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 965 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 966 fprintf(stderr, "read_mrecord: bad CRC\n"); 967 *errorp = EINVAL; 968 return(NULL); 969 } 970 *errorp = 0; 971 return(mrec); 972 } 973 974 static 975 void 976 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 977 int bytes) 978 { 979 char zbuf[HAMMER_HEAD_ALIGN]; 980 int pad; 981 982 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 983 984 assert(bytes >= (int)sizeof(mrec->head)); 985 bzero(&mrec->head, sizeof(mrec->head)); 986 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 987 mrec->head.type = type; 988 mrec->head.rec_size = bytes; 989 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 990 bytes - HAMMER_MREC_CRCOFF); 991 if (write(fdout, mrec, bytes) != bytes) { 992 fprintf(stderr, "write_mrecord: error %d (%s)\n", 993 errno, strerror(errno)); 994 exit(1); 995 } 996 if (pad) { 997 bzero(zbuf, pad); 998 if (write(fdout, zbuf, pad) != pad) { 999 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1000 errno, strerror(errno)); 1001 exit(1); 1002 } 1003 } 1004 } 1005 1006 /* 1007 * Generate a mirroring header with the pfs information of the 1008 * originating filesytem. 1009 */ 1010 static void 1011 generate_mrec_header(int fd, int fdout, int pfs_id, 1012 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1013 { 1014 struct hammer_ioc_pseudofs_rw pfs; 1015 union hammer_ioc_mrecord_any mrec_tmp; 1016 1017 bzero(&pfs, sizeof(pfs)); 1018 bzero(&mrec_tmp, sizeof(mrec_tmp)); 1019 pfs.pfs_id = pfs_id; 1020 pfs.ondisk = &mrec_tmp.pfs.pfsd; 1021 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 1022 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1023 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1024 exit(1); 1025 } 1026 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1027 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1028 exit(1); 1029 } 1030 1031 /* 1032 * sync_beg_tid - lowest TID on source after which a full history 1033 * is available. 1034 * 1035 * sync_end_tid - highest fully synchronized TID from source. 1036 */ 1037 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 1038 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 1039 if (tid_endp) 1040 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 1041 mrec_tmp.pfs.version = pfs.version; 1042 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 1043 &mrec_tmp, sizeof(mrec_tmp.pfs)); 1044 } 1045 1046 /* 1047 * Validate the pfs information from the originating filesystem 1048 * against the target filesystem. shared_uuid must match. 1049 * 1050 * return -1 if we got a TERM record 1051 */ 1052 static int 1053 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1054 struct hammer_ioc_mrecord_head *pickup, 1055 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1056 { 1057 struct hammer_ioc_pseudofs_rw pfs; 1058 struct hammer_pseudofs_data pfsd; 1059 hammer_ioc_mrecord_any_t mrec; 1060 int error; 1061 1062 /* 1063 * Get the PFSD info from the target filesystem. 1064 */ 1065 bzero(&pfs, sizeof(pfs)); 1066 bzero(&pfsd, sizeof(pfsd)); 1067 pfs.pfs_id = pfs_id; 1068 pfs.ondisk = &pfsd; 1069 pfs.bytes = sizeof(pfsd); 1070 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1071 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1072 exit(1); 1073 } 1074 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1075 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1076 exit(1); 1077 } 1078 1079 mrec = read_mrecord(fdin, &error, pickup); 1080 if (mrec == NULL) { 1081 if (error == 0) 1082 fprintf(stderr, "validate_mrec_header: short read\n"); 1083 exit(1); 1084 } 1085 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1086 free(mrec); 1087 return(-1); 1088 } 1089 1090 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1091 fprintf(stderr, "validate_mrec_header: did not get expected " 1092 "PFSD record type\n"); 1093 exit(1); 1094 } 1095 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1096 fprintf(stderr, "validate_mrec_header: unexpected payload " 1097 "size\n"); 1098 exit(1); 1099 } 1100 if (mrec->pfs.version != pfs.version) { 1101 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1102 exit(1); 1103 } 1104 1105 /* 1106 * Whew. Ok, is the read PFS info compatible with the target? 1107 */ 1108 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1109 sizeof(pfsd.shared_uuid)) != 0) { 1110 fprintf(stderr, 1111 "mirror-write: source and target have " 1112 "different shared-uuid's!\n"); 1113 exit(1); 1114 } 1115 if (is_target && 1116 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1117 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1118 exit(1); 1119 } 1120 if (tid_begp) 1121 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1122 if (tid_endp) 1123 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1124 free(mrec); 1125 return(0); 1126 } 1127 1128 static void 1129 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1130 { 1131 struct hammer_ioc_pseudofs_rw pfs; 1132 struct hammer_pseudofs_data pfsd; 1133 1134 bzero(&pfs, sizeof(pfs)); 1135 bzero(&pfsd, sizeof(pfsd)); 1136 pfs.pfs_id = pfs_id; 1137 pfs.ondisk = &pfsd; 1138 pfs.bytes = sizeof(pfsd); 1139 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1140 perror("update_pfs_snapshot (read)"); 1141 exit(1); 1142 } 1143 if (pfsd.sync_end_tid != snapshot_tid) { 1144 pfsd.sync_end_tid = snapshot_tid; 1145 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1146 perror("update_pfs_snapshot (rewrite)"); 1147 exit(1); 1148 } 1149 if (VerboseOpt >= 2) { 1150 fprintf(stderr, 1151 "Mirror-write: Completed, updated snapshot " 1152 "to %016llx\n", 1153 snapshot_tid); 1154 } 1155 } 1156 } 1157 1158 /* 1159 * Bandwidth-limited write in chunks 1160 */ 1161 static 1162 ssize_t 1163 writebw(int fd, const void *buf, size_t nbytes, 1164 u_int64_t *bwcount, struct timeval *tv1) 1165 { 1166 struct timeval tv2; 1167 size_t n; 1168 ssize_t r; 1169 ssize_t a; 1170 int usec; 1171 1172 a = 0; 1173 r = 0; 1174 while (nbytes) { 1175 if (*bwcount + nbytes > BandwidthOpt) 1176 n = BandwidthOpt - *bwcount; 1177 else 1178 n = nbytes; 1179 if (n) 1180 r = write(fd, buf, n); 1181 if (r >= 0) { 1182 a += r; 1183 nbytes -= r; 1184 buf = (const char *)buf + r; 1185 } 1186 if ((size_t)r != n) 1187 break; 1188 *bwcount += n; 1189 if (*bwcount >= BandwidthOpt) { 1190 gettimeofday(&tv2, NULL); 1191 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1192 (int)(tv2.tv_usec - tv1->tv_usec); 1193 if (usec >= 0 && usec < 1000000) 1194 usleep(1000000 - usec); 1195 gettimeofday(tv1, NULL); 1196 *bwcount -= BandwidthOpt; 1197 } 1198 } 1199 return(a ? a : r); 1200 } 1201 1202 /* 1203 * Get a yes or no answer from the terminal. The program may be run as 1204 * part of a two-way pipe so we cannot use stdin for this operation. 1205 */ 1206 static int 1207 getyn(void) 1208 { 1209 char buf[256]; 1210 FILE *fp; 1211 int result; 1212 1213 fp = fopen("/dev/tty", "r"); 1214 if (fp == NULL) { 1215 fprintf(stderr, "No terminal for response\n"); 1216 return(-1); 1217 } 1218 result = -1; 1219 while (fgets(buf, sizeof(buf), fp) != NULL) { 1220 if (buf[0] == 'y' || buf[0] == 'Y') { 1221 result = 1; 1222 break; 1223 } 1224 if (buf[0] == 'n' || buf[0] == 'N') { 1225 result = 0; 1226 break; 1227 } 1228 fprintf(stderr, "Response not understood\n"); 1229 break; 1230 } 1231 fclose(fp); 1232 return(result); 1233 } 1234 1235 static void 1236 mirror_usage(int code) 1237 { 1238 fprintf(stderr, 1239 "hammer mirror-read <filesystem> [begin-tid]\n" 1240 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1241 "hammer mirror-write <filesystem>\n" 1242 "hammer mirror-dump\n" 1243 "hammer mirror-copy [[user@]host:]<filesystem>" 1244 " [[user@]host:]<filesystem>\n" 1245 "hammer mirror-stream [[user@]host:]<filesystem>" 1246 " [[user@]host:]<filesystem>\n" 1247 ); 1248 exit(code); 1249 } 1250 1251