1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.12 2008/07/31 06:01:31 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 44 hammer_ioc_mrecord_head_t pickup); 45 static void write_mrecord(int fdout, u_int32_t type, 46 hammer_ioc_mrecord_any_t mrec, int bytes); 47 static void generate_mrec_header(int fd, int fdout, int pfs_id, 48 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 50 struct hammer_ioc_mrecord_head *pickup, 51 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 53 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 54 u_int64_t *bwcount, struct timeval *tv1); 55 static void mirror_usage(int code); 56 57 /* 58 * Generate a mirroring data stream from the specific source over the 59 * entire key range, but restricted to the specified transaction range. 60 * 61 * The HAMMER VFS does most of the work, we add a few new mrecord 62 * types to negotiate the TID ranges and verify that the entire 63 * stream made it to the destination. 64 */ 65 void 66 hammer_cmd_mirror_read(char **av, int ac, int streaming) 67 { 68 struct hammer_ioc_mirror_rw mirror; 69 struct hammer_ioc_pseudofs_rw pfs; 70 union hammer_ioc_mrecord_any mrec_tmp; 71 struct hammer_ioc_mrecord_head pickup; 72 hammer_ioc_mrecord_any_t mrec; 73 hammer_tid_t sync_tid; 74 const char *filesystem; 75 char *buf = malloc(SERIALBUF_SIZE); 76 int interrupted = 0; 77 int error; 78 int fd; 79 int n; 80 int didwork; 81 int64_t total_bytes; 82 time_t base_t = time(NULL); 83 struct timeval bwtv; 84 u_int64_t bwcount; 85 86 if (ac > 2) 87 mirror_usage(1); 88 filesystem = av[0]; 89 90 pickup.signature = 0; 91 pickup.type = 0; 92 93 again: 94 bzero(&mirror, sizeof(mirror)); 95 hammer_key_beg_init(&mirror.key_beg); 96 hammer_key_end_init(&mirror.key_end); 97 98 fd = getpfs(&pfs, filesystem); 99 100 if (streaming && VerboseOpt) { 101 fprintf(stderr, "\nRunning"); 102 fflush(stderr); 103 } 104 total_bytes = 0; 105 gettimeofday(&bwtv, NULL); 106 bwcount = 0; 107 108 /* 109 * In 2-way mode the target will send us a PFS info packet 110 * first. Use the target's current snapshot TID as our default 111 * begin TID. 112 */ 113 mirror.tid_beg = 0; 114 if (TwoWayPipeOpt) { 115 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 116 NULL, &mirror.tid_beg); 117 if (n < 0) { /* got TERM record */ 118 relpfs(fd, &pfs); 119 return; 120 } 121 ++mirror.tid_beg; 122 } 123 124 /* 125 * Write out the PFS header, tid_beg will be updated if our PFS 126 * has a larger begin sync. tid_end is set to the latest source 127 * TID whos flush cycle has completed. 128 */ 129 generate_mrec_header(fd, 1, pfs.pfs_id, 130 &mirror.tid_beg, &mirror.tid_end); 131 132 /* XXX streaming mode support w/ cycle or command line arg */ 133 /* 134 * A cycle file overrides the beginning TID 135 */ 136 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 137 138 if (ac == 2) 139 mirror.tid_beg = strtoull(av[1], NULL, 0); 140 141 if (streaming == 0 || VerboseOpt >= 2) { 142 fprintf(stderr, 143 "Mirror-read: Mirror from %016llx to %016llx\n", 144 mirror.tid_beg, mirror.tid_end); 145 } 146 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 147 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 148 mirror.key_beg.obj_id); 149 } 150 151 /* 152 * Nothing to do if begin equals end. 153 */ 154 if (mirror.tid_beg >= mirror.tid_end) { 155 if (streaming == 0 || VerboseOpt >= 2) 156 fprintf(stderr, "Mirror-read: No work to do\n"); 157 didwork = 0; 158 goto done; 159 } 160 didwork = 1; 161 162 /* 163 * Write out bulk records 164 */ 165 mirror.ubuf = buf; 166 mirror.size = SERIALBUF_SIZE; 167 168 do { 169 mirror.count = 0; 170 mirror.pfs_id = pfs.pfs_id; 171 mirror.shared_uuid = pfs.ondisk->shared_uuid; 172 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 173 fprintf(stderr, "Mirror-read %s failed: %s\n", 174 filesystem, strerror(errno)); 175 exit(1); 176 } 177 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 178 fprintf(stderr, 179 "Mirror-read %s fatal error %d\n", 180 filesystem, mirror.head.error); 181 exit(1); 182 } 183 if (mirror.count) { 184 if (BandwidthOpt) { 185 n = writebw(1, mirror.ubuf, mirror.count, 186 &bwcount, &bwtv); 187 } else { 188 n = write(1, mirror.ubuf, mirror.count); 189 } 190 if (n != mirror.count) { 191 fprintf(stderr, "Mirror-read %s failed: " 192 "short write\n", 193 filesystem); 194 exit(1); 195 } 196 } 197 total_bytes += mirror.count; 198 if (streaming && VerboseOpt) { 199 fprintf(stderr, "\r%016llx %11lld", 200 mirror.key_cur.obj_id, 201 total_bytes); 202 fflush(stderr); 203 } 204 mirror.key_beg = mirror.key_cur; 205 if (TimeoutOpt && 206 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 207 fprintf(stderr, 208 "Mirror-read %s interrupted by timer at" 209 " %016llx\n", 210 filesystem, 211 mirror.key_cur.obj_id); 212 interrupted = 1; 213 break; 214 } 215 } while (mirror.count != 0); 216 217 done: 218 /* 219 * Write out the termination sync record - only if not interrupted 220 */ 221 if (interrupted == 0) { 222 if (didwork) { 223 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 224 &mrec_tmp, sizeof(mrec_tmp.sync)); 225 } else { 226 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 227 &mrec_tmp, sizeof(mrec_tmp.sync)); 228 } 229 } 230 231 /* 232 * If the -2 option was given (automatic when doing mirror-copy), 233 * a two-way pipe is assumed and we expect a response mrec from 234 * the target. 235 */ 236 if (TwoWayPipeOpt) { 237 mrec = read_mrecord(0, &error, &pickup); 238 if (mrec == NULL || 239 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 240 mrec->head.rec_size != sizeof(mrec->update)) { 241 fprintf(stderr, "mirror_read: Did not get final " 242 "acknowledgement packet from target\n"); 243 exit(1); 244 } 245 if (interrupted) { 246 if (CyclePath) { 247 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 248 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath); 249 } 250 } else { 251 sync_tid = mrec->update.tid; 252 if (CyclePath) { 253 hammer_key_beg_init(&mirror.key_beg); 254 hammer_set_cycle(&mirror.key_beg, sync_tid); 255 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 256 CyclePath, sync_tid); 257 } 258 } 259 } else if (CyclePath) { 260 /* NOTE! mirror.tid_beg cannot be updated */ 261 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 262 "fully updated unless you use mirror-copy\n"); 263 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 264 } 265 if (streaming && interrupted == 0) { 266 time_t t1 = time(NULL); 267 time_t t2; 268 269 if (VerboseOpt) { 270 fprintf(stderr, " W"); 271 fflush(stderr); 272 } 273 pfs.ondisk->sync_end_tid = mirror.tid_end; 274 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 275 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 276 filesystem, strerror(errno)); 277 } else { 278 t2 = time(NULL) - t1; 279 if (t2 >= 0 && t2 < DelayOpt) { 280 if (VerboseOpt) { 281 fprintf(stderr, "\bD"); 282 fflush(stderr); 283 } 284 sleep(DelayOpt - t2); 285 } 286 if (VerboseOpt) { 287 fprintf(stderr, "\b "); 288 fflush(stderr); 289 } 290 relpfs(fd, &pfs); 291 goto again; 292 } 293 } 294 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 295 &mrec_tmp, sizeof(mrec_tmp.sync)); 296 relpfs(fd, &pfs); 297 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 298 } 299 300 /* 301 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 302 * some additional packet types to negotiate TID ranges and to verify 303 * completion. The HAMMER VFS does most of the work. 304 * 305 * It is important to note that the mirror.key_{beg,end} range must 306 * match the ranged used by the original. For now both sides use 307 * range the entire key space. 308 * 309 * It is even more important that the records in the stream conform 310 * to the TID range also supplied in the stream. The HAMMER VFS will 311 * use the REC, PASS, and SKIP record types to track the portions of 312 * the B-Tree being scanned in order to be able to proactively delete 313 * records on the target within those active areas that are not mentioned 314 * by the source. 315 * 316 * The mirror.key_cur field is used by the VFS to do this tracking. It 317 * must be initialized to key_beg but then is persistently updated by 318 * the HAMMER VFS on each successive ioctl() call. If you blow up this 319 * field you will blow up the mirror target, possibly to the point of 320 * deleting everything. As a safety measure the HAMMER VFS simply marks 321 * the records that the source has destroyed as deleted on the target, 322 * and normal pruning operations will deal with their final disposition 323 * at some later time. 324 */ 325 void 326 hammer_cmd_mirror_write(char **av, int ac) 327 { 328 struct hammer_ioc_mirror_rw mirror; 329 const char *filesystem; 330 char *buf = malloc(SERIALBUF_SIZE); 331 struct hammer_ioc_pseudofs_rw pfs; 332 struct hammer_ioc_mrecord_head pickup; 333 struct hammer_ioc_synctid synctid; 334 union hammer_ioc_mrecord_any mrec_tmp; 335 hammer_ioc_mrecord_any_t mrec; 336 int error; 337 int fd; 338 int n; 339 340 if (ac > 2) 341 mirror_usage(1); 342 filesystem = av[0]; 343 344 pickup.signature = 0; 345 pickup.type = 0; 346 347 again: 348 bzero(&mirror, sizeof(mirror)); 349 hammer_key_beg_init(&mirror.key_beg); 350 hammer_key_end_init(&mirror.key_end); 351 mirror.key_end = mirror.key_beg; 352 353 fd = getpfs(&pfs, filesystem); 354 355 /* 356 * In two-way mode the target writes out a PFS packet first. 357 * The source uses our tid_end as its tid_beg by default, 358 * picking up where it left off. 359 */ 360 mirror.tid_beg = 0; 361 if (TwoWayPipeOpt) { 362 generate_mrec_header(fd, 1, pfs.pfs_id, 363 &mirror.tid_beg, &mirror.tid_end); 364 } 365 366 /* 367 * Read and process the PFS header. The source informs us of 368 * the TID range the stream represents. 369 */ 370 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 371 &mirror.tid_beg, &mirror.tid_end); 372 if (n < 0) { /* got TERM record */ 373 relpfs(fd, &pfs); 374 return; 375 } 376 377 mirror.ubuf = buf; 378 mirror.size = SERIALBUF_SIZE; 379 380 /* 381 * Read and process bulk records (REC, PASS, and SKIP types). 382 * 383 * On your life, do NOT mess with mirror.key_cur or your mirror 384 * target may become history. 385 */ 386 for (;;) { 387 mirror.count = 0; 388 mirror.pfs_id = pfs.pfs_id; 389 mirror.shared_uuid = pfs.ondisk->shared_uuid; 390 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 391 if (mirror.size <= 0) 392 break; 393 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 394 fprintf(stderr, "Mirror-write %s failed: %s\n", 395 filesystem, strerror(errno)); 396 exit(1); 397 } 398 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 399 fprintf(stderr, 400 "Mirror-write %s fatal error %d\n", 401 filesystem, mirror.head.error); 402 exit(1); 403 } 404 #if 0 405 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 406 fprintf(stderr, 407 "Mirror-write %s interrupted by timer at" 408 " %016llx\n", 409 filesystem, 410 mirror.key_cur.obj_id); 411 exit(0); 412 } 413 #endif 414 } 415 416 /* 417 * Read and process the termination sync record. 418 */ 419 mrec = read_mrecord(0, &error, &pickup); 420 421 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 422 fprintf(stderr, "Mirror-write: received termination request\n"); 423 free(mrec); 424 return; 425 } 426 427 if (mrec == NULL || 428 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 429 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 430 mrec->head.rec_size != sizeof(mrec->sync)) { 431 fprintf(stderr, "Mirror-write %s: Did not get termination " 432 "sync record, or rec_size is wrong rt=%d\n", 433 filesystem, mrec->head.type); 434 exit(1); 435 } 436 437 /* 438 * Update the PFS info on the target so the user has visibility 439 * into the new snapshot, and sync the target filesystem. 440 */ 441 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 442 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 443 444 bzero(&synctid, sizeof(synctid)); 445 synctid.op = HAMMER_SYNCTID_SYNC2; 446 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 447 448 if (VerboseOpt >= 2) { 449 fprintf(stderr, "Mirror-write %s: succeeded\n", 450 filesystem); 451 } 452 } 453 454 free(mrec); 455 mrec = NULL; 456 457 /* 458 * Report back to the originator. 459 */ 460 if (TwoWayPipeOpt) { 461 mrec_tmp.update.tid = mirror.tid_end; 462 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 463 &mrec_tmp, sizeof(mrec_tmp.update)); 464 } else { 465 printf("Source can update synctid to 0x%016llx\n", 466 mirror.tid_end); 467 } 468 relpfs(fd, &pfs); 469 goto again; 470 } 471 472 void 473 hammer_cmd_mirror_dump(void) 474 { 475 char *buf = malloc(SERIALBUF_SIZE); 476 struct hammer_ioc_mrecord_head pickup; 477 hammer_ioc_mrecord_any_t mrec; 478 int error; 479 int size; 480 int offset; 481 int bytes; 482 483 /* 484 * Read and process the PFS header 485 */ 486 pickup.signature = 0; 487 pickup.type = 0; 488 489 mrec = read_mrecord(0, &error, &pickup); 490 491 /* 492 * Read and process bulk records 493 */ 494 for (;;) { 495 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 496 if (size <= 0) 497 break; 498 offset = 0; 499 while (offset < size) { 500 mrec = (void *)((char *)buf + offset); 501 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 502 if (offset + bytes > size) { 503 fprintf(stderr, "Misaligned record\n"); 504 exit(1); 505 } 506 507 switch(mrec->head.type) { 508 case HAMMER_MREC_TYPE_REC: 509 printf("Record obj=%016llx key=%016llx " 510 "rt=%02x ot=%02x\n", 511 mrec->rec.leaf.base.obj_id, 512 mrec->rec.leaf.base.key, 513 mrec->rec.leaf.base.rec_type, 514 mrec->rec.leaf.base.obj_type); 515 printf(" tids %016llx:%016llx data=%d\n", 516 mrec->rec.leaf.base.create_tid, 517 mrec->rec.leaf.base.delete_tid, 518 mrec->rec.leaf.data_len); 519 break; 520 case HAMMER_MREC_TYPE_PASS: 521 printf("Pass obj=%016llx key=%016llx " 522 "rt=%02x ot=%02x\n", 523 mrec->rec.leaf.base.obj_id, 524 mrec->rec.leaf.base.key, 525 mrec->rec.leaf.base.rec_type, 526 mrec->rec.leaf.base.obj_type); 527 printf(" tids %016llx:%016llx data=%d\n", 528 mrec->rec.leaf.base.create_tid, 529 mrec->rec.leaf.base.delete_tid, 530 mrec->rec.leaf.data_len); 531 break; 532 case HAMMER_MREC_TYPE_SKIP: 533 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 534 " obj=%016llx key=%016llx rt=%02x\n", 535 mrec->skip.skip_beg.obj_id, 536 mrec->skip.skip_beg.key, 537 mrec->skip.skip_beg.rec_type, 538 mrec->skip.skip_end.obj_id, 539 mrec->skip.skip_end.key, 540 mrec->skip.skip_end.rec_type); 541 default: 542 break; 543 } 544 offset += bytes; 545 } 546 } 547 548 /* 549 * Read and process the termination sync record. 550 */ 551 mrec = read_mrecord(0, &error, &pickup); 552 if (mrec == NULL || 553 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 554 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 555 ) { 556 fprintf(stderr, "Mirror-dump: Did not get termination " 557 "sync record\n"); 558 } 559 } 560 561 void 562 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 563 { 564 pid_t pid1; 565 pid_t pid2; 566 int fds[2]; 567 const char *xav[16]; 568 char tbuf[16]; 569 char *ptr; 570 int xac; 571 572 if (ac != 2) 573 mirror_usage(1); 574 575 if (pipe(fds) < 0) { 576 perror("pipe"); 577 exit(1); 578 } 579 580 TwoWayPipeOpt = 1; 581 582 /* 583 * Source 584 */ 585 if ((pid1 = fork()) == 0) { 586 dup2(fds[0], 0); 587 dup2(fds[0], 1); 588 close(fds[0]); 589 close(fds[1]); 590 if ((ptr = strchr(av[0], ':')) != NULL) { 591 *ptr++ = 0; 592 xac = 0; 593 xav[xac++] = "ssh"; 594 xav[xac++] = av[0]; 595 xav[xac++] = "hammer"; 596 597 switch(VerboseOpt) { 598 case 0: 599 break; 600 case 1: 601 xav[xac++] = "-v"; 602 break; 603 case 2: 604 xav[xac++] = "-vv"; 605 break; 606 default: 607 xav[xac++] = "-vvv"; 608 break; 609 } 610 xav[xac++] = "-2"; 611 if (TimeoutOpt) { 612 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 613 xav[xac++] = "-t"; 614 xav[xac++] = tbuf; 615 } 616 if (streaming) 617 xav[xac++] = "mirror-read-streaming"; 618 else 619 xav[xac++] = "mirror-read"; 620 xav[xac++] = ptr; 621 xav[xac++] = NULL; 622 execv("/usr/bin/ssh", (void *)xav); 623 } else { 624 hammer_cmd_mirror_read(av, 1, streaming); 625 fflush(stdout); 626 fflush(stderr); 627 } 628 _exit(1); 629 } 630 631 /* 632 * Target 633 */ 634 if ((pid2 = fork()) == 0) { 635 dup2(fds[1], 0); 636 dup2(fds[1], 1); 637 close(fds[0]); 638 close(fds[1]); 639 if ((ptr = strchr(av[1], ':')) != NULL) { 640 *ptr++ = 0; 641 xac = 0; 642 xav[xac++] = "ssh"; 643 xav[xac++] = av[1]; 644 xav[xac++] = "hammer"; 645 646 switch(VerboseOpt) { 647 case 0: 648 break; 649 case 1: 650 xav[xac++] = "-v"; 651 break; 652 case 2: 653 xav[xac++] = "-vv"; 654 break; 655 default: 656 xav[xac++] = "-vvv"; 657 break; 658 } 659 660 xav[xac++] = "-2"; 661 xav[xac++] = "mirror-write"; 662 xav[xac++] = ptr; 663 xav[xac++] = NULL; 664 execv("/usr/bin/ssh", (void *)xav); 665 } else { 666 hammer_cmd_mirror_write(av + 1, 1); 667 fflush(stdout); 668 fflush(stderr); 669 } 670 _exit(1); 671 } 672 close(fds[0]); 673 close(fds[1]); 674 675 while (waitpid(pid1, NULL, 0) <= 0) 676 ; 677 while (waitpid(pid2, NULL, 0) <= 0) 678 ; 679 } 680 681 /* 682 * Read and return multiple mrecords 683 */ 684 static int 685 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 686 { 687 hammer_ioc_mrecord_any_t mrec; 688 u_int count; 689 size_t n; 690 size_t i; 691 size_t bytes; 692 693 count = 0; 694 while (size - count >= HAMMER_MREC_HEADSIZE) { 695 /* 696 * Cached the record header in case we run out of buffer 697 * space. 698 */ 699 fflush(stdout); 700 if (pickup->signature == 0) { 701 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 702 i = read(fd, (char *)pickup + n, 703 HAMMER_MREC_HEADSIZE - n); 704 if (i <= 0) 705 break; 706 } 707 if (n == 0) 708 break; 709 if (n != HAMMER_MREC_HEADSIZE) { 710 fprintf(stderr, "read_mrecords: short read on pipe\n"); 711 exit(1); 712 } 713 714 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 715 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n"); 716 exit(1); 717 } 718 } 719 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 720 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 721 fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n"); 722 exit(1); 723 } 724 725 /* 726 * Stop if we have insufficient space for the record and data. 727 */ 728 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 729 if (size - count < bytes) 730 break; 731 732 /* 733 * Stop if the record type is not a REC or a SKIP (the only 734 * two types the ioctl supports. Other types are used only 735 * by the userland protocol). 736 */ 737 if (pickup->type != HAMMER_MREC_TYPE_REC && 738 pickup->type != HAMMER_MREC_TYPE_SKIP && 739 pickup->type != HAMMER_MREC_TYPE_PASS) { 740 break; 741 } 742 743 /* 744 * Read the remainder and clear the pickup signature. 745 */ 746 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 747 i = read(fd, buf + count + n, bytes - n); 748 if (i <= 0) 749 break; 750 } 751 if (n != bytes) { 752 fprintf(stderr, "read_mrecords: short read on pipe\n"); 753 exit(1); 754 } 755 756 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 757 pickup->signature = 0; 758 pickup->type = 0; 759 mrec = (void *)(buf + count); 760 761 /* 762 * Validate the completed record 763 */ 764 if (mrec->head.rec_crc != 765 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 766 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 767 fprintf(stderr, "read_mrecords: malformed record " 768 "on pipe, bad crc\n"); 769 exit(1); 770 } 771 772 /* 773 * If its a B-Tree record validate the data crc 774 */ 775 if (mrec->head.type == HAMMER_MREC_TYPE_REC) { 776 if (mrec->head.rec_size < 777 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 778 fprintf(stderr, 779 "read_mrecords: malformed record on " 780 "pipe, illegal element data_len\n"); 781 exit(1); 782 } 783 if (mrec->rec.leaf.data_len && 784 mrec->rec.leaf.data_offset && 785 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 786 fprintf(stderr, 787 "read_mrecords: data_crc did not " 788 "match data! obj=%016llx key=%016llx\n", 789 mrec->rec.leaf.base.obj_id, 790 mrec->rec.leaf.base.key); 791 fprintf(stderr, 792 "continuing, but there are problems\n"); 793 } 794 } 795 count += bytes; 796 } 797 return(count); 798 } 799 800 /* 801 * Read and return a single mrecord. 802 */ 803 static 804 hammer_ioc_mrecord_any_t 805 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 806 { 807 hammer_ioc_mrecord_any_t mrec; 808 struct hammer_ioc_mrecord_head mrechd; 809 size_t bytes; 810 size_t n; 811 size_t i; 812 813 if (pickup && pickup->type != 0) { 814 mrechd = *pickup; 815 pickup->signature = 0; 816 pickup->type = 0; 817 n = HAMMER_MREC_HEADSIZE; 818 } else { 819 /* 820 * Read in the PFSD header from the sender. 821 */ 822 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 823 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 824 if (i <= 0) 825 break; 826 } 827 if (n == 0) { 828 *errorp = 0; /* EOF */ 829 return(NULL); 830 } 831 if (n != HAMMER_MREC_HEADSIZE) { 832 fprintf(stderr, "short read of mrecord header\n"); 833 *errorp = EPIPE; 834 return(NULL); 835 } 836 } 837 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 838 fprintf(stderr, "read_mrecord: bad signature\n"); 839 *errorp = EINVAL; 840 return(NULL); 841 } 842 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 843 assert(bytes >= sizeof(mrechd)); 844 mrec = malloc(bytes); 845 mrec->head = mrechd; 846 847 while (n < bytes) { 848 i = read(fdin, (char *)mrec + n, bytes - n); 849 if (i <= 0) 850 break; 851 n += i; 852 } 853 if (n != bytes) { 854 fprintf(stderr, "read_mrecord: short read on payload\n"); 855 *errorp = EPIPE; 856 return(NULL); 857 } 858 if (mrec->head.rec_crc != 859 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 860 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 861 fprintf(stderr, "read_mrecord: bad CRC\n"); 862 *errorp = EINVAL; 863 return(NULL); 864 } 865 *errorp = 0; 866 return(mrec); 867 } 868 869 static 870 void 871 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 872 int bytes) 873 { 874 char zbuf[HAMMER_HEAD_ALIGN]; 875 int pad; 876 877 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 878 879 assert(bytes >= (int)sizeof(mrec->head)); 880 bzero(&mrec->head, sizeof(mrec->head)); 881 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 882 mrec->head.type = type; 883 mrec->head.rec_size = bytes; 884 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 885 bytes - HAMMER_MREC_CRCOFF); 886 if (write(fdout, mrec, bytes) != bytes) { 887 fprintf(stderr, "write_mrecord: error %d (%s)\n", 888 errno, strerror(errno)); 889 exit(1); 890 } 891 if (pad) { 892 bzero(zbuf, pad); 893 if (write(fdout, zbuf, pad) != pad) { 894 fprintf(stderr, "write_mrecord: error %d (%s)\n", 895 errno, strerror(errno)); 896 exit(1); 897 } 898 } 899 } 900 901 /* 902 * Generate a mirroring header with the pfs information of the 903 * originating filesytem. 904 */ 905 static void 906 generate_mrec_header(int fd, int fdout, int pfs_id, 907 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 908 { 909 struct hammer_ioc_pseudofs_rw pfs; 910 union hammer_ioc_mrecord_any mrec_tmp; 911 912 bzero(&pfs, sizeof(pfs)); 913 bzero(&mrec_tmp, sizeof(mrec_tmp)); 914 pfs.pfs_id = pfs_id; 915 pfs.ondisk = &mrec_tmp.pfs.pfsd; 916 pfs.bytes = sizeof(mrec_tmp.pfs.pfsd); 917 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 918 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 919 exit(1); 920 } 921 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 922 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 923 exit(1); 924 } 925 926 /* 927 * sync_beg_tid - lowest TID on source after which a full history 928 * is available. 929 * 930 * sync_end_tid - highest fully synchronized TID from source. 931 */ 932 if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid) 933 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid; 934 if (tid_endp) 935 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid; 936 mrec_tmp.pfs.version = pfs.version; 937 write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD, 938 &mrec_tmp, sizeof(mrec_tmp.pfs)); 939 } 940 941 /* 942 * Validate the pfs information from the originating filesystem 943 * against the target filesystem. shared_uuid must match. 944 * 945 * return -1 if we got a TERM record 946 */ 947 static int 948 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 949 struct hammer_ioc_mrecord_head *pickup, 950 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 951 { 952 struct hammer_ioc_pseudofs_rw pfs; 953 struct hammer_pseudofs_data pfsd; 954 hammer_ioc_mrecord_any_t mrec; 955 int error; 956 957 /* 958 * Get the PFSD info from the target filesystem. 959 */ 960 bzero(&pfs, sizeof(pfs)); 961 bzero(&pfsd, sizeof(pfsd)); 962 pfs.pfs_id = pfs_id; 963 pfs.ondisk = &pfsd; 964 pfs.bytes = sizeof(pfsd); 965 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 966 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 967 exit(1); 968 } 969 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 970 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 971 exit(1); 972 } 973 974 mrec = read_mrecord(fdin, &error, pickup); 975 if (mrec == NULL) { 976 if (error == 0) 977 fprintf(stderr, "validate_mrec_header: short read\n"); 978 exit(1); 979 } 980 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 981 free(mrec); 982 return(-1); 983 } 984 985 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 986 fprintf(stderr, "validate_mrec_header: did not get expected " 987 "PFSD record type\n"); 988 exit(1); 989 } 990 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 991 fprintf(stderr, "validate_mrec_header: unexpected payload " 992 "size\n"); 993 exit(1); 994 } 995 if (mrec->pfs.version != pfs.version) { 996 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 997 exit(1); 998 } 999 1000 /* 1001 * Whew. Ok, is the read PFS info compatible with the target? 1002 */ 1003 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1004 sizeof(pfsd.shared_uuid)) != 0) { 1005 fprintf(stderr, 1006 "mirror-write: source and target have " 1007 "different shared-uuid's!\n"); 1008 exit(1); 1009 } 1010 if (is_target && 1011 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1012 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1013 exit(1); 1014 } 1015 if (tid_begp) 1016 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1017 if (tid_endp) 1018 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1019 free(mrec); 1020 return(0); 1021 } 1022 1023 static void 1024 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1025 { 1026 struct hammer_ioc_pseudofs_rw pfs; 1027 struct hammer_pseudofs_data pfsd; 1028 1029 bzero(&pfs, sizeof(pfs)); 1030 bzero(&pfsd, sizeof(pfsd)); 1031 pfs.pfs_id = pfs_id; 1032 pfs.ondisk = &pfsd; 1033 pfs.bytes = sizeof(pfsd); 1034 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1035 perror("update_pfs_snapshot (read)"); 1036 exit(1); 1037 } 1038 if (pfsd.sync_end_tid != snapshot_tid) { 1039 pfsd.sync_end_tid = snapshot_tid; 1040 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1041 perror("update_pfs_snapshot (rewrite)"); 1042 exit(1); 1043 } 1044 if (VerboseOpt >= 2) { 1045 fprintf(stderr, 1046 "Mirror-write: Completed, updated snapshot " 1047 "to %016llx\n", 1048 snapshot_tid); 1049 } 1050 } 1051 } 1052 1053 /* 1054 * Bandwidth-limited write in chunks 1055 */ 1056 static 1057 ssize_t 1058 writebw(int fd, const void *buf, size_t nbytes, 1059 u_int64_t *bwcount, struct timeval *tv1) 1060 { 1061 struct timeval tv2; 1062 size_t n; 1063 ssize_t r; 1064 ssize_t a; 1065 int usec; 1066 1067 a = 0; 1068 r = 0; 1069 while (nbytes) { 1070 if (*bwcount + nbytes > BandwidthOpt) 1071 n = BandwidthOpt - *bwcount; 1072 else 1073 n = nbytes; 1074 if (n) 1075 r = write(fd, buf, n); 1076 if (r >= 0) { 1077 a += r; 1078 nbytes -= r; 1079 buf = (const char *)buf + r; 1080 } 1081 if ((size_t)r != n) 1082 break; 1083 *bwcount += n; 1084 if (*bwcount >= BandwidthOpt) { 1085 gettimeofday(&tv2, NULL); 1086 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1087 (int)(tv2.tv_usec - tv1->tv_usec); 1088 if (usec >= 0 && usec < 1000000) 1089 usleep(1000000 - usec); 1090 gettimeofday(tv1, NULL); 1091 *bwcount -= BandwidthOpt; 1092 } 1093 } 1094 return(a ? a : r); 1095 } 1096 1097 static void 1098 mirror_usage(int code) 1099 { 1100 fprintf(stderr, 1101 "hammer mirror-read <filesystem>\n" 1102 "hammer mirror-write <filesystem>\n" 1103 "hammer mirror-dump\n" 1104 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n" 1105 ); 1106 exit(code); 1107 } 1108