1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.16 2008/11/09 05:22:56 dillon Exp $ 35 */ 36 37 #include "hammer.h" 38 39 #define SERIALBUF_SIZE (512 * 1024) 40 41 static int read_mrecords(int fd, char *buf, u_int size, 42 hammer_ioc_mrecord_head_t pickup); 43 static int generate_histogram(int fd, const char *filesystem, 44 hammer_tid_t **histogram_ary, 45 struct hammer_ioc_mirror_rw *mirror_base); 46 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 47 hammer_ioc_mrecord_head_t pickup); 48 static void write_mrecord(int fdout, u_int32_t type, 49 hammer_ioc_mrecord_any_t mrec, int bytes); 50 static void generate_mrec_header(int fd, int pfs_id, 51 union hammer_ioc_mrecord_any *mrec_tmp); 52 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 53 struct hammer_ioc_mrecord_head *pickup, 54 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 55 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 56 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 57 u_int64_t *bwcount, struct timeval *tv1); 58 static int getyn(void); 59 static void mirror_usage(int code); 60 61 #define BULK_MINIMUM 20000 62 63 /* 64 * Generate a mirroring data stream from the specific source over the 65 * entire key range, but restricted to the specified transaction range. 66 * 67 * The HAMMER VFS does most of the work, we add a few new mrecord 68 * types to negotiate the TID ranges and verify that the entire 69 * stream made it to the destination. 70 */ 71 void 72 hammer_cmd_mirror_read(char **av, int ac, int streaming) 73 { 74 struct hammer_ioc_mirror_rw mirror; 75 struct hammer_ioc_pseudofs_rw pfs; 76 union hammer_ioc_mrecord_any mrec_tmp; 77 struct hammer_ioc_mrecord_head pickup; 78 hammer_ioc_mrecord_any_t mrec; 79 hammer_tid_t sync_tid; 80 hammer_tid_t *histogram_ary; 81 const char *filesystem; 82 char *buf = malloc(SERIALBUF_SIZE); 83 int interrupted = 0; 84 int error; 85 int fd; 86 int n; 87 int didwork; 88 int histogram; 89 int64_t total_bytes; 90 time_t base_t = time(NULL); 91 struct timeval bwtv; 92 u_int64_t bwcount; 93 94 if (ac == 0 || ac > 2) 95 mirror_usage(1); 96 filesystem = av[0]; 97 98 pickup.signature = 0; 99 pickup.type = 0; 100 histogram = -1; 101 histogram_ary = NULL; 102 103 again: 104 bzero(&mirror, sizeof(mirror)); 105 hammer_key_beg_init(&mirror.key_beg); 106 hammer_key_end_init(&mirror.key_end); 107 108 fd = getpfs(&pfs, filesystem); 109 110 if (streaming && VerboseOpt) { 111 fprintf(stderr, "\nRunning"); 112 fflush(stderr); 113 } 114 total_bytes = 0; 115 gettimeofday(&bwtv, NULL); 116 bwcount = 0; 117 118 /* 119 * Send initial header for the purpose of determining the 120 * shared-uuid. 121 */ 122 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 123 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 124 &mrec_tmp, sizeof(mrec_tmp.pfs)); 125 126 /* 127 * In 2-way mode the target will send us a PFS info packet 128 * first. Use the target's current snapshot TID as our default 129 * begin TID. 130 */ 131 mirror.tid_beg = 0; 132 if (TwoWayPipeOpt) { 133 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 134 NULL, &mirror.tid_beg); 135 if (n < 0) { /* got TERM record */ 136 relpfs(fd, &pfs); 137 return; 138 } 139 ++mirror.tid_beg; 140 } 141 142 /* 143 * Write out the PFS header, tid_beg will be updated if our PFS 144 * has a larger begin sync. tid_end is set to the latest source 145 * TID whos flush cycle has completed. 146 */ 147 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 148 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 149 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 150 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 151 mirror.ubuf = buf; 152 mirror.size = SERIALBUF_SIZE; 153 mirror.pfs_id = pfs.pfs_id; 154 mirror.shared_uuid = pfs.ondisk->shared_uuid; 155 156 /* 157 * XXX If the histogram is exhausted and the TID delta is large 158 * the stream might have been offline for a while and is 159 * now picking it up again. Do another histogram. 160 */ 161 #if 0 162 if (TwoWayPipeOpt && streaming && histogram == 0) { 163 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 164 histogram = -1; 165 } 166 #endif 167 168 /* 169 * Initial bulk startup control, try to do some incremental 170 * mirroring in order to allow the stream to be killed and 171 * restarted without having to start over. 172 */ 173 if (histogram < 0 && BulkOpt == 0) { 174 if (VerboseOpt) 175 fprintf(stderr, "\n"); 176 histogram = generate_histogram(fd, filesystem, 177 &histogram_ary, &mirror); 178 } 179 180 if (TwoWayPipeOpt && streaming && histogram > 0) { 181 mirror.tid_end = histogram_ary[--histogram]; 182 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 183 } 184 185 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 186 &mrec_tmp, sizeof(mrec_tmp.pfs)); 187 188 /* 189 * A cycle file overrides the beginning TID only if we are 190 * not operating in two-way mode. 191 */ 192 if (TwoWayPipeOpt == 0) { 193 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 194 } 195 196 /* 197 * An additional argument overrides the beginning TID regardless 198 * of what mode we are in. This is not recommending if operating 199 * in two-way mode. 200 */ 201 if (ac == 2) 202 mirror.tid_beg = strtoull(av[1], NULL, 0); 203 204 if (streaming == 0 || VerboseOpt >= 2) { 205 fprintf(stderr, 206 "Mirror-read: Mirror from %016llx to %016llx\n", 207 mirror.tid_beg, mirror.tid_end); 208 } 209 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 210 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n", 211 mirror.key_beg.obj_id); 212 } 213 214 /* 215 * Nothing to do if begin equals end. 216 */ 217 if (mirror.tid_beg >= mirror.tid_end) { 218 if (streaming == 0 || VerboseOpt >= 2) 219 fprintf(stderr, "Mirror-read: No work to do\n"); 220 didwork = 0; 221 goto done; 222 } 223 didwork = 1; 224 225 /* 226 * Write out bulk records 227 */ 228 mirror.ubuf = buf; 229 mirror.size = SERIALBUF_SIZE; 230 231 do { 232 mirror.count = 0; 233 mirror.pfs_id = pfs.pfs_id; 234 mirror.shared_uuid = pfs.ondisk->shared_uuid; 235 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 236 fprintf(stderr, "Mirror-read %s failed: %s\n", 237 filesystem, strerror(errno)); 238 exit(1); 239 } 240 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 241 fprintf(stderr, 242 "Mirror-read %s fatal error %d\n", 243 filesystem, mirror.head.error); 244 exit(1); 245 } 246 if (mirror.count) { 247 if (BandwidthOpt) { 248 n = writebw(1, mirror.ubuf, mirror.count, 249 &bwcount, &bwtv); 250 } else { 251 n = write(1, mirror.ubuf, mirror.count); 252 } 253 if (n != mirror.count) { 254 fprintf(stderr, "Mirror-read %s failed: " 255 "short write\n", 256 filesystem); 257 exit(1); 258 } 259 } 260 total_bytes += mirror.count; 261 if (streaming && VerboseOpt) { 262 fprintf(stderr, 263 "\robj=%016llx tids=%016llx:%016llx %11lld", 264 (long long)mirror.key_cur.obj_id, 265 (long long)mirror.tid_beg, 266 (long long)mirror.tid_end, 267 total_bytes); 268 fflush(stderr); 269 } 270 mirror.key_beg = mirror.key_cur; 271 272 /* 273 * Deal with time limit option 274 */ 275 if (TimeoutOpt && 276 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 277 fprintf(stderr, 278 "Mirror-read %s interrupted by timer at" 279 " %016llx\n", 280 filesystem, 281 mirror.key_cur.obj_id); 282 interrupted = 1; 283 break; 284 } 285 } while (mirror.count != 0); 286 287 done: 288 /* 289 * Write out the termination sync record - only if not interrupted 290 */ 291 if (interrupted == 0) { 292 if (didwork) { 293 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 294 &mrec_tmp, sizeof(mrec_tmp.sync)); 295 } else { 296 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 297 &mrec_tmp, sizeof(mrec_tmp.sync)); 298 } 299 } 300 301 /* 302 * If the -2 option was given (automatic when doing mirror-copy), 303 * a two-way pipe is assumed and we expect a response mrec from 304 * the target. 305 */ 306 if (TwoWayPipeOpt) { 307 mrec = read_mrecord(0, &error, &pickup); 308 if (mrec == NULL || 309 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 310 mrec->head.rec_size != sizeof(mrec->update)) { 311 fprintf(stderr, "mirror_read: Did not get final " 312 "acknowledgement packet from target\n"); 313 exit(1); 314 } 315 if (interrupted) { 316 if (CyclePath) { 317 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg); 318 fprintf(stderr, "Cyclefile %s updated for " 319 "continuation\n", CyclePath); 320 } 321 } else { 322 sync_tid = mrec->update.tid; 323 if (CyclePath) { 324 hammer_key_beg_init(&mirror.key_beg); 325 hammer_set_cycle(&mirror.key_beg, sync_tid); 326 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n", 327 CyclePath, sync_tid); 328 } 329 } 330 } else if (CyclePath) { 331 /* NOTE! mirror.tid_beg cannot be updated */ 332 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 333 "fully updated unless you use mirror-copy\n"); 334 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 335 } 336 if (streaming && interrupted == 0) { 337 time_t t1 = time(NULL); 338 time_t t2; 339 340 /* 341 * Two way streaming tries to break down large bulk 342 * transfers into smaller ones so it can sync the 343 * transaction id on the slave. This way if we get 344 * interrupted a restart doesn't have to start from 345 * scratch. 346 */ 347 if (TwoWayPipeOpt && streaming && histogram > 0) { 348 if (VerboseOpt) 349 fprintf(stderr, " (bulk incremental)"); 350 goto again; 351 } 352 353 if (VerboseOpt) { 354 fprintf(stderr, " W"); 355 fflush(stderr); 356 } 357 pfs.ondisk->sync_end_tid = mirror.tid_end; 358 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 359 fprintf(stderr, "Mirror-read %s: cannot stream: %s\n", 360 filesystem, strerror(errno)); 361 } else { 362 t2 = time(NULL) - t1; 363 if (t2 >= 0 && t2 < DelayOpt) { 364 if (VerboseOpt) { 365 fprintf(stderr, "\bD"); 366 fflush(stderr); 367 } 368 sleep(DelayOpt - t2); 369 } 370 if (VerboseOpt) { 371 fprintf(stderr, "\b "); 372 fflush(stderr); 373 } 374 relpfs(fd, &pfs); 375 goto again; 376 } 377 } 378 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 379 &mrec_tmp, sizeof(mrec_tmp.sync)); 380 relpfs(fd, &pfs); 381 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 382 } 383 384 /* 385 * Ok, this isn't really a histogram. What we are trying to do 386 * here is find the first tid_end for the scan that returns 387 * at least some data. The frontend of the TID space will generally 388 * return nothing so we can't just divide out the full mirroring 389 * range. Once we find the point where a real data stream starts 390 * to get generated we can divide out the range from that point. 391 * 392 * When starting a new mirroring operation completely from scratch 393 * this code will take some time to run, but once some mirroring 394 * data is synchronized on the target you will be able to interrupt 395 * the stream and restart it and the later invocations of this 396 * code will be such that it should run much faster. 397 */ 398 static int 399 generate_histogram(int fd, const char *filesystem, 400 hammer_tid_t **histogram_ary, 401 struct hammer_ioc_mirror_rw *mirror_base) 402 { 403 struct hammer_ioc_mirror_rw mirror; 404 hammer_tid_t tid_beg; 405 hammer_tid_t tid_end; 406 hammer_tid_t tid_half; 407 int i; 408 409 mirror = *mirror_base; 410 tid_beg = mirror.tid_beg; 411 tid_end = mirror.tid_end; 412 413 if (*histogram_ary) 414 free(*histogram_ary); 415 if (tid_beg + BULK_MINIMUM >= tid_end) 416 return(0); 417 418 if (VerboseOpt) 419 fprintf(stderr, "Doing Range Test\n"); 420 while (tid_end - tid_beg > BULK_MINIMUM) { 421 tid_half = tid_beg + (tid_end - tid_beg) * 2 / 3; 422 mirror.count = 0; 423 mirror.tid_beg = tid_beg; 424 mirror.tid_end = tid_half; 425 426 if (VerboseOpt > 1) { 427 fprintf(stderr, "RangeTest %016llx/%016llx - %016llx (%lld) ", 428 (long long)tid_beg, 429 (long long)tid_end, 430 (long long)tid_half, 431 (long long)(tid_half - tid_beg)); 432 } 433 fflush(stderr); 434 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 435 fprintf(stderr, "Mirror-read %s failed: %s\n", 436 filesystem, strerror(errno)); 437 exit(1); 438 } 439 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 440 fprintf(stderr, 441 "Mirror-read %s fatal error %d\n", 442 filesystem, mirror.head.error); 443 exit(1); 444 } 445 if (VerboseOpt > 1) 446 fprintf(stderr, "%d\n", mirror.count); 447 if (mirror.count > SERIALBUF_SIZE / 2) { 448 tid_end = tid_half; 449 } else { 450 tid_beg = tid_half; 451 } 452 } 453 454 tid_end = mirror_base->tid_end; 455 fprintf(stderr, "histogram range %016llx - %016llx\n", 456 (long long)tid_beg, (long long)tid_end); 457 458 /* 459 * The final array generates our incremental ending tids in 460 * reverse order. The caller also picks them off in reverse order. 461 */ 462 *histogram_ary = malloc(sizeof(hammer_tid_t) * 20); 463 for (i = 0; i < 20; ++i) { 464 (*histogram_ary)[i] = tid_end - (tid_end - tid_beg) / 20 * i; 465 } 466 return(20); 467 } 468 469 static void 470 create_pfs(const char *filesystem, uuid_t *s_uuid) 471 { 472 if (ForceYesOpt == 1) { 473 fprintf(stderr, "PFS slave %s does not exist. " 474 "Auto create new slave PFS!\n", filesystem); 475 476 } else { 477 fprintf(stderr, "PFS slave %s does not exist.\n" 478 "Do you want to create a new slave PFS? (yes|no) ", 479 filesystem); 480 fflush(stderr); 481 if (getyn() != 1) { 482 fprintf(stderr, "Aborting operation\n"); 483 exit(1); 484 } 485 } 486 487 u_int32_t status; 488 char *shared_uuid = NULL; 489 uuid_to_string(s_uuid, &shared_uuid, &status); 490 491 char *cmd = NULL; 492 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 493 filesystem, shared_uuid); 494 free(shared_uuid); 495 496 if (cmd == NULL) { 497 fprintf(stderr, "Failed to alloc memory\n"); 498 exit(1); 499 } 500 if (system(cmd) != 0) { 501 fprintf(stderr, "Failed to create PFS\n"); 502 } 503 free(cmd); 504 } 505 506 /* 507 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 508 * some additional packet types to negotiate TID ranges and to verify 509 * completion. The HAMMER VFS does most of the work. 510 * 511 * It is important to note that the mirror.key_{beg,end} range must 512 * match the ranged used by the original. For now both sides use 513 * range the entire key space. 514 * 515 * It is even more important that the records in the stream conform 516 * to the TID range also supplied in the stream. The HAMMER VFS will 517 * use the REC, PASS, and SKIP record types to track the portions of 518 * the B-Tree being scanned in order to be able to proactively delete 519 * records on the target within those active areas that are not mentioned 520 * by the source. 521 * 522 * The mirror.key_cur field is used by the VFS to do this tracking. It 523 * must be initialized to key_beg but then is persistently updated by 524 * the HAMMER VFS on each successive ioctl() call. If you blow up this 525 * field you will blow up the mirror target, possibly to the point of 526 * deleting everything. As a safety measure the HAMMER VFS simply marks 527 * the records that the source has destroyed as deleted on the target, 528 * and normal pruning operations will deal with their final disposition 529 * at some later time. 530 */ 531 void 532 hammer_cmd_mirror_write(char **av, int ac) 533 { 534 struct hammer_ioc_mirror_rw mirror; 535 const char *filesystem; 536 char *buf = malloc(SERIALBUF_SIZE); 537 struct hammer_ioc_pseudofs_rw pfs; 538 struct hammer_ioc_mrecord_head pickup; 539 struct hammer_ioc_synctid synctid; 540 union hammer_ioc_mrecord_any mrec_tmp; 541 hammer_ioc_mrecord_any_t mrec; 542 struct stat st; 543 int error; 544 int fd; 545 int n; 546 547 if (ac != 1) 548 mirror_usage(1); 549 filesystem = av[0]; 550 551 pickup.signature = 0; 552 pickup.type = 0; 553 554 again: 555 bzero(&mirror, sizeof(mirror)); 556 hammer_key_beg_init(&mirror.key_beg); 557 hammer_key_end_init(&mirror.key_end); 558 mirror.key_end = mirror.key_beg; 559 560 /* 561 * Read initial packet 562 */ 563 mrec = read_mrecord(0, &error, &pickup); 564 if (mrec == NULL) { 565 if (error == 0) 566 fprintf(stderr, "validate_mrec_header: short read\n"); 567 exit(1); 568 } 569 /* 570 * Validate packet 571 */ 572 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 573 return; 574 } 575 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 576 fprintf(stderr, "validate_mrec_header: did not get expected " 577 "PFSD record type\n"); 578 exit(1); 579 } 580 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 581 fprintf(stderr, "validate_mrec_header: unexpected payload " 582 "size\n"); 583 exit(1); 584 } 585 586 /* 587 * Create slave PFS if it doesn't yet exist 588 */ 589 if (lstat(filesystem, &st) != 0) { 590 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 591 } 592 free(mrec); 593 mrec = NULL; 594 595 fd = getpfs(&pfs, filesystem); 596 597 /* 598 * In two-way mode the target writes out a PFS packet first. 599 * The source uses our tid_end as its tid_beg by default, 600 * picking up where it left off. 601 */ 602 mirror.tid_beg = 0; 603 if (TwoWayPipeOpt) { 604 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 605 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 606 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 607 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 608 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 609 &mrec_tmp, sizeof(mrec_tmp.pfs)); 610 } 611 612 /* 613 * Read and process the PFS header. The source informs us of 614 * the TID range the stream represents. 615 */ 616 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 617 &mirror.tid_beg, &mirror.tid_end); 618 if (n < 0) { /* got TERM record */ 619 relpfs(fd, &pfs); 620 return; 621 } 622 623 mirror.ubuf = buf; 624 mirror.size = SERIALBUF_SIZE; 625 626 /* 627 * Read and process bulk records (REC, PASS, and SKIP types). 628 * 629 * On your life, do NOT mess with mirror.key_cur or your mirror 630 * target may become history. 631 */ 632 for (;;) { 633 mirror.count = 0; 634 mirror.pfs_id = pfs.pfs_id; 635 mirror.shared_uuid = pfs.ondisk->shared_uuid; 636 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 637 if (mirror.size <= 0) 638 break; 639 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 640 fprintf(stderr, "Mirror-write %s failed: %s\n", 641 filesystem, strerror(errno)); 642 exit(1); 643 } 644 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 645 fprintf(stderr, 646 "Mirror-write %s fatal error %d\n", 647 filesystem, mirror.head.error); 648 exit(1); 649 } 650 #if 0 651 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 652 fprintf(stderr, 653 "Mirror-write %s interrupted by timer at" 654 " %016llx\n", 655 filesystem, 656 mirror.key_cur.obj_id); 657 exit(0); 658 } 659 #endif 660 } 661 662 /* 663 * Read and process the termination sync record. 664 */ 665 mrec = read_mrecord(0, &error, &pickup); 666 667 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 668 fprintf(stderr, "Mirror-write: received termination request\n"); 669 free(mrec); 670 return; 671 } 672 673 if (mrec == NULL || 674 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 675 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 676 mrec->head.rec_size != sizeof(mrec->sync)) { 677 fprintf(stderr, "Mirror-write %s: Did not get termination " 678 "sync record, or rec_size is wrong rt=%d\n", 679 filesystem, mrec->head.type); 680 exit(1); 681 } 682 683 /* 684 * Update the PFS info on the target so the user has visibility 685 * into the new snapshot, and sync the target filesystem. 686 */ 687 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 688 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 689 690 bzero(&synctid, sizeof(synctid)); 691 synctid.op = HAMMER_SYNCTID_SYNC2; 692 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 693 694 if (VerboseOpt >= 2) { 695 fprintf(stderr, "Mirror-write %s: succeeded\n", 696 filesystem); 697 } 698 } 699 700 free(mrec); 701 mrec = NULL; 702 703 /* 704 * Report back to the originator. 705 */ 706 if (TwoWayPipeOpt) { 707 mrec_tmp.update.tid = mirror.tid_end; 708 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 709 &mrec_tmp, sizeof(mrec_tmp.update)); 710 } else { 711 printf("Source can update synctid to 0x%016llx\n", 712 mirror.tid_end); 713 } 714 relpfs(fd, &pfs); 715 goto again; 716 } 717 718 void 719 hammer_cmd_mirror_dump(void) 720 { 721 char *buf = malloc(SERIALBUF_SIZE); 722 struct hammer_ioc_mrecord_head pickup; 723 hammer_ioc_mrecord_any_t mrec; 724 int error; 725 int size; 726 int offset; 727 int bytes; 728 729 /* 730 * Read and process the PFS header 731 */ 732 pickup.signature = 0; 733 pickup.type = 0; 734 735 mrec = read_mrecord(0, &error, &pickup); 736 737 /* 738 * Read and process bulk records 739 */ 740 for (;;) { 741 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 742 if (size <= 0) 743 break; 744 offset = 0; 745 while (offset < size) { 746 mrec = (void *)((char *)buf + offset); 747 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 748 if (offset + bytes > size) { 749 fprintf(stderr, "Misaligned record\n"); 750 exit(1); 751 } 752 753 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 754 case HAMMER_MREC_TYPE_REC_BADCRC: 755 case HAMMER_MREC_TYPE_REC: 756 printf("Record obj=%016llx key=%016llx " 757 "rt=%02x ot=%02x", 758 mrec->rec.leaf.base.obj_id, 759 mrec->rec.leaf.base.key, 760 mrec->rec.leaf.base.rec_type, 761 mrec->rec.leaf.base.obj_type); 762 if (mrec->head.type == 763 HAMMER_MREC_TYPE_REC_BADCRC) { 764 printf(" (BAD CRC)"); 765 } 766 printf("\n"); 767 printf(" tids %016llx:%016llx data=%d\n", 768 mrec->rec.leaf.base.create_tid, 769 mrec->rec.leaf.base.delete_tid, 770 mrec->rec.leaf.data_len); 771 break; 772 case HAMMER_MREC_TYPE_PASS: 773 printf("Pass obj=%016llx key=%016llx " 774 "rt=%02x ot=%02x\n", 775 mrec->rec.leaf.base.obj_id, 776 mrec->rec.leaf.base.key, 777 mrec->rec.leaf.base.rec_type, 778 mrec->rec.leaf.base.obj_type); 779 printf(" tids %016llx:%016llx data=%d\n", 780 mrec->rec.leaf.base.create_tid, 781 mrec->rec.leaf.base.delete_tid, 782 mrec->rec.leaf.data_len); 783 break; 784 case HAMMER_MREC_TYPE_SKIP: 785 printf("Skip obj=%016llx key=%016llx rt=%02x to\n" 786 " obj=%016llx key=%016llx rt=%02x\n", 787 mrec->skip.skip_beg.obj_id, 788 mrec->skip.skip_beg.key, 789 mrec->skip.skip_beg.rec_type, 790 mrec->skip.skip_end.obj_id, 791 mrec->skip.skip_end.key, 792 mrec->skip.skip_end.rec_type); 793 default: 794 break; 795 } 796 offset += bytes; 797 } 798 } 799 800 /* 801 * Read and process the termination sync record. 802 */ 803 mrec = read_mrecord(0, &error, &pickup); 804 if (mrec == NULL || 805 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 806 mrec->head.type != HAMMER_MREC_TYPE_IDLE) 807 ) { 808 fprintf(stderr, "Mirror-dump: Did not get termination " 809 "sync record\n"); 810 } 811 } 812 813 void 814 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 815 { 816 pid_t pid1; 817 pid_t pid2; 818 int fds[2]; 819 const char *xav[16]; 820 char tbuf[16]; 821 char *ptr; 822 int xac; 823 824 if (ac != 2) 825 mirror_usage(1); 826 827 TwoWayPipeOpt = 1; 828 signal(SIGPIPE, SIG_IGN); 829 830 again: 831 if (pipe(fds) < 0) { 832 perror("pipe"); 833 exit(1); 834 } 835 836 /* 837 * Source 838 */ 839 if ((pid1 = fork()) == 0) { 840 signal(SIGPIPE, SIG_DFL); 841 dup2(fds[0], 0); 842 dup2(fds[0], 1); 843 close(fds[0]); 844 close(fds[1]); 845 if ((ptr = strchr(av[0], ':')) != NULL) { 846 *ptr++ = 0; 847 xac = 0; 848 xav[xac++] = "ssh"; 849 xav[xac++] = av[0]; 850 xav[xac++] = "hammer"; 851 852 switch(VerboseOpt) { 853 case 0: 854 break; 855 case 1: 856 xav[xac++] = "-v"; 857 break; 858 case 2: 859 xav[xac++] = "-vv"; 860 break; 861 default: 862 xav[xac++] = "-vvv"; 863 break; 864 } 865 if (ForceYesOpt) { 866 xav[xac++] = "-y"; 867 } 868 xav[xac++] = "-2"; 869 if (TimeoutOpt) { 870 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 871 xav[xac++] = "-t"; 872 xav[xac++] = tbuf; 873 } 874 if (streaming) 875 xav[xac++] = "mirror-read-stream"; 876 else 877 xav[xac++] = "mirror-read"; 878 xav[xac++] = ptr; 879 xav[xac++] = NULL; 880 execv("/usr/bin/ssh", (void *)xav); 881 } else { 882 hammer_cmd_mirror_read(av, 1, streaming); 883 fflush(stdout); 884 fflush(stderr); 885 } 886 _exit(1); 887 } 888 889 /* 890 * Target 891 */ 892 if ((pid2 = fork()) == 0) { 893 signal(SIGPIPE, SIG_DFL); 894 dup2(fds[1], 0); 895 dup2(fds[1], 1); 896 close(fds[0]); 897 close(fds[1]); 898 if ((ptr = strchr(av[1], ':')) != NULL) { 899 *ptr++ = 0; 900 xac = 0; 901 xav[xac++] = "ssh"; 902 xav[xac++] = av[1]; 903 xav[xac++] = "hammer"; 904 905 switch(VerboseOpt) { 906 case 0: 907 break; 908 case 1: 909 xav[xac++] = "-v"; 910 break; 911 case 2: 912 xav[xac++] = "-vv"; 913 break; 914 default: 915 xav[xac++] = "-vvv"; 916 break; 917 } 918 if (ForceYesOpt) { 919 xav[xac++] = "-y"; 920 } 921 xav[xac++] = "-2"; 922 xav[xac++] = "mirror-write"; 923 xav[xac++] = ptr; 924 xav[xac++] = NULL; 925 execv("/usr/bin/ssh", (void *)xav); 926 } else { 927 hammer_cmd_mirror_write(av + 1, 1); 928 fflush(stdout); 929 fflush(stderr); 930 } 931 _exit(1); 932 } 933 close(fds[0]); 934 close(fds[1]); 935 936 while (waitpid(pid1, NULL, 0) <= 0) 937 ; 938 while (waitpid(pid2, NULL, 0) <= 0) 939 ; 940 941 /* 942 * If the link is lost restart 943 */ 944 if (streaming) { 945 if (VerboseOpt) { 946 fprintf(stderr, "\nLost Link\n"); 947 fflush(stderr); 948 } 949 sleep(15 + DelayOpt); 950 goto again; 951 } 952 953 } 954 955 /* 956 * Read and return multiple mrecords 957 */ 958 static int 959 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 960 { 961 hammer_ioc_mrecord_any_t mrec; 962 u_int count; 963 size_t n; 964 size_t i; 965 size_t bytes; 966 int type; 967 968 count = 0; 969 while (size - count >= HAMMER_MREC_HEADSIZE) { 970 /* 971 * Cached the record header in case we run out of buffer 972 * space. 973 */ 974 fflush(stdout); 975 if (pickup->signature == 0) { 976 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 977 i = read(fd, (char *)pickup + n, 978 HAMMER_MREC_HEADSIZE - n); 979 if (i <= 0) 980 break; 981 } 982 if (n == 0) 983 break; 984 if (n != HAMMER_MREC_HEADSIZE) { 985 fprintf(stderr, "read_mrecords: short read on pipe\n"); 986 exit(1); 987 } 988 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 989 fprintf(stderr, "read_mrecords: malformed record on pipe, " 990 "bad signature\n"); 991 exit(1); 992 } 993 } 994 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 995 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 996 fprintf(stderr, "read_mrecords: malformed record on pipe, " 997 "illegal rec_size\n"); 998 exit(1); 999 } 1000 1001 /* 1002 * Stop if we have insufficient space for the record and data. 1003 */ 1004 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1005 if (size - count < bytes) 1006 break; 1007 1008 /* 1009 * Stop if the record type is not a REC, SKIP, or PASS, 1010 * which are the only types the ioctl supports. Other types 1011 * are used only by the userland protocol. 1012 * 1013 * Ignore all flags. 1014 */ 1015 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1016 if (type != HAMMER_MREC_TYPE_PFSD && 1017 type != HAMMER_MREC_TYPE_REC && 1018 type != HAMMER_MREC_TYPE_SKIP && 1019 type != HAMMER_MREC_TYPE_PASS) { 1020 break; 1021 } 1022 1023 /* 1024 * Read the remainder and clear the pickup signature. 1025 */ 1026 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1027 i = read(fd, buf + count + n, bytes - n); 1028 if (i <= 0) 1029 break; 1030 } 1031 if (n != bytes) { 1032 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1033 exit(1); 1034 } 1035 1036 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1037 pickup->signature = 0; 1038 pickup->type = 0; 1039 mrec = (void *)(buf + count); 1040 1041 /* 1042 * Validate the completed record 1043 */ 1044 if (mrec->head.rec_crc != 1045 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1046 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1047 fprintf(stderr, "read_mrecords: malformed record " 1048 "on pipe, bad crc\n"); 1049 exit(1); 1050 } 1051 1052 /* 1053 * If its a B-Tree record validate the data crc. 1054 * 1055 * NOTE: If the VFS passes us an explicitly errorde mrec 1056 * we just pass it through. 1057 */ 1058 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1059 1060 if (type == HAMMER_MREC_TYPE_REC) { 1061 if (mrec->head.rec_size < 1062 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1063 fprintf(stderr, 1064 "read_mrecords: malformed record on " 1065 "pipe, illegal element data_len\n"); 1066 exit(1); 1067 } 1068 if (mrec->rec.leaf.data_len && 1069 mrec->rec.leaf.data_offset && 1070 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1071 fprintf(stderr, 1072 "read_mrecords: data_crc did not " 1073 "match data! obj=%016llx key=%016llx\n", 1074 mrec->rec.leaf.base.obj_id, 1075 mrec->rec.leaf.base.key); 1076 fprintf(stderr, 1077 "continuing, but there are problems\n"); 1078 } 1079 } 1080 count += bytes; 1081 } 1082 return(count); 1083 } 1084 1085 /* 1086 * Read and return a single mrecord. 1087 */ 1088 static 1089 hammer_ioc_mrecord_any_t 1090 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1091 { 1092 hammer_ioc_mrecord_any_t mrec; 1093 struct hammer_ioc_mrecord_head mrechd; 1094 size_t bytes; 1095 size_t n; 1096 size_t i; 1097 1098 if (pickup && pickup->type != 0) { 1099 mrechd = *pickup; 1100 pickup->signature = 0; 1101 pickup->type = 0; 1102 n = HAMMER_MREC_HEADSIZE; 1103 } else { 1104 /* 1105 * Read in the PFSD header from the sender. 1106 */ 1107 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1108 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1109 if (i <= 0) 1110 break; 1111 } 1112 if (n == 0) { 1113 *errorp = 0; /* EOF */ 1114 return(NULL); 1115 } 1116 if (n != HAMMER_MREC_HEADSIZE) { 1117 fprintf(stderr, "short read of mrecord header\n"); 1118 *errorp = EPIPE; 1119 return(NULL); 1120 } 1121 } 1122 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1123 fprintf(stderr, "read_mrecord: bad signature\n"); 1124 *errorp = EINVAL; 1125 return(NULL); 1126 } 1127 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1128 assert(bytes >= sizeof(mrechd)); 1129 mrec = malloc(bytes); 1130 mrec->head = mrechd; 1131 1132 while (n < bytes) { 1133 i = read(fdin, (char *)mrec + n, bytes - n); 1134 if (i <= 0) 1135 break; 1136 n += i; 1137 } 1138 if (n != bytes) { 1139 fprintf(stderr, "read_mrecord: short read on payload\n"); 1140 *errorp = EPIPE; 1141 return(NULL); 1142 } 1143 if (mrec->head.rec_crc != 1144 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1145 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1146 fprintf(stderr, "read_mrecord: bad CRC\n"); 1147 *errorp = EINVAL; 1148 return(NULL); 1149 } 1150 *errorp = 0; 1151 return(mrec); 1152 } 1153 1154 static 1155 void 1156 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec, 1157 int bytes) 1158 { 1159 char zbuf[HAMMER_HEAD_ALIGN]; 1160 int pad; 1161 1162 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1163 1164 assert(bytes >= (int)sizeof(mrec->head)); 1165 bzero(&mrec->head, sizeof(mrec->head)); 1166 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1167 mrec->head.type = type; 1168 mrec->head.rec_size = bytes; 1169 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1170 bytes - HAMMER_MREC_CRCOFF); 1171 if (write(fdout, mrec, bytes) != bytes) { 1172 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1173 errno, strerror(errno)); 1174 exit(1); 1175 } 1176 if (pad) { 1177 bzero(zbuf, pad); 1178 if (write(fdout, zbuf, pad) != pad) { 1179 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1180 errno, strerror(errno)); 1181 exit(1); 1182 } 1183 } 1184 } 1185 1186 /* 1187 * Generate a mirroring header with the pfs information of the 1188 * originating filesytem. 1189 */ 1190 static void 1191 generate_mrec_header(int fd, int pfs_id, 1192 union hammer_ioc_mrecord_any *mrec_tmp) 1193 { 1194 struct hammer_ioc_pseudofs_rw pfs; 1195 1196 bzero(&pfs, sizeof(pfs)); 1197 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1198 pfs.pfs_id = pfs_id; 1199 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1200 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1201 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1202 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1203 exit(1); 1204 } 1205 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1206 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n"); 1207 exit(1); 1208 } 1209 mrec_tmp->pfs.version = pfs.version; 1210 } 1211 1212 /* 1213 * Validate the pfs information from the originating filesystem 1214 * against the target filesystem. shared_uuid must match. 1215 * 1216 * return -1 if we got a TERM record 1217 */ 1218 static int 1219 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1220 struct hammer_ioc_mrecord_head *pickup, 1221 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1222 { 1223 struct hammer_ioc_pseudofs_rw pfs; 1224 struct hammer_pseudofs_data pfsd; 1225 hammer_ioc_mrecord_any_t mrec; 1226 int error; 1227 1228 /* 1229 * Get the PFSD info from the target filesystem. 1230 */ 1231 bzero(&pfs, sizeof(pfs)); 1232 bzero(&pfsd, sizeof(pfsd)); 1233 pfs.pfs_id = pfs_id; 1234 pfs.ondisk = &pfsd; 1235 pfs.bytes = sizeof(pfsd); 1236 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1237 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1238 exit(1); 1239 } 1240 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1241 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n"); 1242 exit(1); 1243 } 1244 1245 mrec = read_mrecord(fdin, &error, pickup); 1246 if (mrec == NULL) { 1247 if (error == 0) 1248 fprintf(stderr, "validate_mrec_header: short read\n"); 1249 exit(1); 1250 } 1251 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1252 free(mrec); 1253 return(-1); 1254 } 1255 1256 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1257 fprintf(stderr, "validate_mrec_header: did not get expected " 1258 "PFSD record type\n"); 1259 exit(1); 1260 } 1261 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1262 fprintf(stderr, "validate_mrec_header: unexpected payload " 1263 "size\n"); 1264 exit(1); 1265 } 1266 if (mrec->pfs.version != pfs.version) { 1267 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1268 exit(1); 1269 } 1270 1271 /* 1272 * Whew. Ok, is the read PFS info compatible with the target? 1273 */ 1274 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1275 sizeof(pfsd.shared_uuid)) != 0) { 1276 fprintf(stderr, 1277 "mirror-write: source and target have " 1278 "different shared-uuid's!\n"); 1279 exit(1); 1280 } 1281 if (is_target && 1282 (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) { 1283 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1284 exit(1); 1285 } 1286 if (tid_begp) 1287 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1288 if (tid_endp) 1289 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1290 free(mrec); 1291 return(0); 1292 } 1293 1294 static void 1295 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1296 { 1297 struct hammer_ioc_pseudofs_rw pfs; 1298 struct hammer_pseudofs_data pfsd; 1299 1300 bzero(&pfs, sizeof(pfs)); 1301 bzero(&pfsd, sizeof(pfsd)); 1302 pfs.pfs_id = pfs_id; 1303 pfs.ondisk = &pfsd; 1304 pfs.bytes = sizeof(pfsd); 1305 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1306 perror("update_pfs_snapshot (read)"); 1307 exit(1); 1308 } 1309 if (pfsd.sync_end_tid != snapshot_tid) { 1310 pfsd.sync_end_tid = snapshot_tid; 1311 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1312 perror("update_pfs_snapshot (rewrite)"); 1313 exit(1); 1314 } 1315 if (VerboseOpt >= 2) { 1316 fprintf(stderr, 1317 "Mirror-write: Completed, updated snapshot " 1318 "to %016llx\n", 1319 snapshot_tid); 1320 } 1321 } 1322 } 1323 1324 /* 1325 * Bandwidth-limited write in chunks 1326 */ 1327 static 1328 ssize_t 1329 writebw(int fd, const void *buf, size_t nbytes, 1330 u_int64_t *bwcount, struct timeval *tv1) 1331 { 1332 struct timeval tv2; 1333 size_t n; 1334 ssize_t r; 1335 ssize_t a; 1336 int usec; 1337 1338 a = 0; 1339 r = 0; 1340 while (nbytes) { 1341 if (*bwcount + nbytes > BandwidthOpt) 1342 n = BandwidthOpt - *bwcount; 1343 else 1344 n = nbytes; 1345 if (n) 1346 r = write(fd, buf, n); 1347 if (r >= 0) { 1348 a += r; 1349 nbytes -= r; 1350 buf = (const char *)buf + r; 1351 } 1352 if ((size_t)r != n) 1353 break; 1354 *bwcount += n; 1355 if (*bwcount >= BandwidthOpt) { 1356 gettimeofday(&tv2, NULL); 1357 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1358 (int)(tv2.tv_usec - tv1->tv_usec); 1359 if (usec >= 0 && usec < 1000000) 1360 usleep(1000000 - usec); 1361 gettimeofday(tv1, NULL); 1362 *bwcount -= BandwidthOpt; 1363 } 1364 } 1365 return(a ? a : r); 1366 } 1367 1368 /* 1369 * Get a yes or no answer from the terminal. The program may be run as 1370 * part of a two-way pipe so we cannot use stdin for this operation. 1371 */ 1372 static int 1373 getyn(void) 1374 { 1375 char buf[256]; 1376 FILE *fp; 1377 int result; 1378 1379 fp = fopen("/dev/tty", "r"); 1380 if (fp == NULL) { 1381 fprintf(stderr, "No terminal for response\n"); 1382 return(-1); 1383 } 1384 result = -1; 1385 while (fgets(buf, sizeof(buf), fp) != NULL) { 1386 if (buf[0] == 'y' || buf[0] == 'Y') { 1387 result = 1; 1388 break; 1389 } 1390 if (buf[0] == 'n' || buf[0] == 'N') { 1391 result = 0; 1392 break; 1393 } 1394 fprintf(stderr, "Response not understood\n"); 1395 break; 1396 } 1397 fclose(fp); 1398 return(result); 1399 } 1400 1401 static void 1402 mirror_usage(int code) 1403 { 1404 fprintf(stderr, 1405 "hammer mirror-read <filesystem> [begin-tid]\n" 1406 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1407 "hammer mirror-write <filesystem>\n" 1408 "hammer mirror-dump\n" 1409 "hammer mirror-copy [[user@]host:]<filesystem>" 1410 " [[user@]host:]<filesystem>\n" 1411 "hammer mirror-stream [[user@]host:]<filesystem>" 1412 " [[user@]host:]<filesystem>\n" 1413 ); 1414 exit(code); 1415 } 1416 1417