1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 hammer_ioc_mrecord_head_t pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 hammer_ioc_mrecord_head_t pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 const char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) { 114 mirror_usage(1); 115 /* not reached */ 116 } 117 filesystem = av[0]; 118 hammer_check_restrict(filesystem); 119 120 pickup.signature = 0; 121 pickup.type = 0; 122 histogram = 0; 123 histindex = 0; 124 histmax = 0; 125 histogram_ary = NULL; 126 sameline = 0; 127 128 again: 129 bzero(&mirror, sizeof(mirror)); 130 hammer_key_beg_init(&mirror.key_beg); 131 hammer_key_end_init(&mirror.key_end); 132 133 fd = getpfs(&pfs, filesystem); 134 135 if (streaming >= 0) 136 score_printf(LINE1, "Running"); 137 138 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 139 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 140 fflush(stderr); 141 sameline = 1; 142 } 143 sameline = 1; 144 total_bytes = 0; 145 gettimeofday(&bwtv, NULL); 146 bwcount = 0; 147 148 /* 149 * Send initial header for the purpose of determining the 150 * shared-uuid. 151 */ 152 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 153 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 154 &mrec_tmp, sizeof(mrec_tmp.pfs)); 155 156 /* 157 * In 2-way mode the target will send us a PFS info packet 158 * first. Use the target's current snapshot TID as our default 159 * begin TID. 160 */ 161 if (TwoWayPipeOpt) { 162 mirror.tid_beg = 0; 163 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 164 NULL, &mirror.tid_beg); 165 if (n < 0) { /* got TERM record */ 166 relpfs(fd, &pfs); 167 free(buf); 168 free(histogram_ary); 169 return; 170 } 171 ++mirror.tid_beg; 172 } else if (streaming && histogram) { 173 mirror.tid_beg = histogram_ary[histindex].tid + 1; 174 } else { 175 mirror.tid_beg = 0; 176 } 177 178 /* 179 * Write out the PFS header, tid_beg will be updated if our PFS 180 * has a larger begin sync. tid_end is set to the latest source 181 * TID whos flush cycle has completed. 182 */ 183 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 184 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 185 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 186 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 187 mirror.ubuf = buf; 188 mirror.size = SERIALBUF_SIZE; 189 mirror.pfs_id = pfs.pfs_id; 190 mirror.shared_uuid = pfs.ondisk->shared_uuid; 191 192 /* 193 * XXX If the histogram is exhausted and the TID delta is large 194 * the stream might have been offline for a while and is 195 * now picking it up again. Do another histogram. 196 */ 197 #if 0 198 if (streaming && histogram && histindex == histend) { 199 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 200 histogram = 0; 201 } 202 #endif 203 204 /* 205 * Initial bulk startup control, try to do some incremental 206 * mirroring in order to allow the stream to be killed and 207 * restarted without having to start over. 208 */ 209 if (histogram == 0 && BulkOpt == 0) { 210 if (VerboseOpt && repeat == 0) { 211 fprintf(stderr, "\n"); 212 sameline = 0; 213 } 214 histmax = generate_histogram(fd, filesystem, 215 &histogram_ary, &mirror, 216 &repeat); 217 histindex = 0; 218 histogram = 1; 219 220 /* 221 * Just stream the histogram, then stop 222 */ 223 if (streaming == 0) 224 streaming = -1; 225 } 226 227 if (streaming && histogram) { 228 ++histindex; 229 mirror.tid_end = histogram_ary[histindex].tid; 230 estbytes = histogram_ary[histindex-1].bytes; 231 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 232 } else { 233 estbytes = 0; 234 } 235 236 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 237 &mrec_tmp, sizeof(mrec_tmp.pfs)); 238 239 /* 240 * A cycle file overrides the beginning TID only if we are 241 * not operating in two-way or histogram mode. 242 */ 243 if (TwoWayPipeOpt == 0 && histogram == 0) 244 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 245 246 /* 247 * An additional argument overrides the beginning TID regardless 248 * of what mode we are in. This is not recommending if operating 249 * in two-way mode. 250 */ 251 if (ac == 2) 252 mirror.tid_beg = strtoull(av[1], NULL, 0); 253 254 if (streaming == 0 || VerboseOpt >= 2) { 255 fprintf(stderr, 256 "Mirror-read: Mirror %016jx to %016jx", 257 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 258 if (histogram) 259 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 260 fprintf(stderr, "\n"); 261 fflush(stderr); 262 } 263 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 264 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 265 (uintmax_t)mirror.key_beg.obj_id); 266 } 267 268 /* 269 * Nothing to do if begin equals end. 270 */ 271 if (mirror.tid_beg >= mirror.tid_end) { 272 if (streaming == 0 || VerboseOpt >= 2) 273 fprintf(stderr, "Mirror-read: No work to do\n"); 274 sleep(DelayOpt); 275 didwork = 0; 276 histogram = 0; 277 goto done; 278 } 279 didwork = 1; 280 281 /* 282 * Write out bulk records 283 */ 284 mirror.ubuf = buf; 285 mirror.size = SERIALBUF_SIZE; 286 287 do { 288 mirror.count = 0; 289 mirror.pfs_id = pfs.pfs_id; 290 mirror.shared_uuid = pfs.ondisk->shared_uuid; 291 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 292 score_printf(LINE3, "Mirror-read %s failed: %s", 293 filesystem, strerror(errno)); 294 err(1, "Mirror-read %s failed", filesystem); 295 /* not reached */ 296 } 297 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 298 score_printf(LINE3, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 errx(1, "Mirror-read %s fatal error %d", 301 filesystem, mirror.head.error); 302 /* not reached */ 303 } 304 if (mirror.count) { 305 if (BandwidthOpt) { 306 n = writebw(1, mirror.ubuf, mirror.count, 307 &bwcount, &bwtv); 308 } else { 309 n = write(1, mirror.ubuf, mirror.count); 310 } 311 if (n != mirror.count) { 312 score_printf(LINE3, 313 "Mirror-read %s failed: " 314 "short write", 315 filesystem); 316 errx(1, "Mirror-read %s failed: short write", 317 filesystem); 318 /* not reached */ 319 } 320 } 321 total_bytes += mirror.count; 322 if (streaming && VerboseOpt) { 323 fprintf(stderr, 324 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 325 (uintmax_t)mirror.key_cur.obj_id, 326 (uintmax_t)mirror.tid_beg, 327 (uintmax_t)mirror.tid_end, 328 (intmax_t)total_bytes); 329 fflush(stderr); 330 sameline = 0; 331 } else if (streaming) { 332 score_printf(LINE2, 333 "obj=%016jx tids=%016jx:%016jx %11jd", 334 (uintmax_t)mirror.key_cur.obj_id, 335 (uintmax_t)mirror.tid_beg, 336 (uintmax_t)mirror.tid_end, 337 (intmax_t)total_bytes); 338 } 339 mirror.key_beg = mirror.key_cur; 340 341 /* 342 * Deal with time limit option 343 */ 344 if (TimeoutOpt && 345 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 346 score_printf(LINE3, 347 "Mirror-read %s interrupted by timer at" 348 " %016jx", 349 filesystem, 350 (uintmax_t)mirror.key_cur.obj_id); 351 fprintf(stderr, 352 "Mirror-read %s interrupted by timer at" 353 " %016jx\n", 354 filesystem, 355 (uintmax_t)mirror.key_cur.obj_id); 356 interrupted = 1; 357 break; 358 } 359 } while (mirror.count != 0); 360 361 done: 362 if (streaming && VerboseOpt && sameline == 0) { 363 fprintf(stderr, "\n"); 364 fflush(stderr); 365 sameline = 1; 366 } 367 368 /* 369 * Write out the termination sync record - only if not interrupted 370 */ 371 if (interrupted == 0) { 372 if (didwork) { 373 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 374 &mrec_tmp, sizeof(mrec_tmp.sync)); 375 } else { 376 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 377 &mrec_tmp, sizeof(mrec_tmp.sync)); 378 } 379 } 380 381 /* 382 * If the -2 option was given (automatic when doing mirror-copy), 383 * a two-way pipe is assumed and we expect a response mrec from 384 * the target. 385 */ 386 if (TwoWayPipeOpt) { 387 mrec = read_mrecord(0, &error, &pickup); 388 if (mrec == NULL || 389 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 390 mrec->head.rec_size != sizeof(mrec->update)) { 391 errx(1, "mirror_read: Did not get final " 392 "acknowledgement packet from target"); 393 /* not reached */ 394 } 395 if (interrupted) { 396 if (CyclePath) { 397 hammer_set_cycle(&mirror.key_cur, 398 mirror.tid_beg); 399 fprintf(stderr, "Cyclefile %s updated for " 400 "continuation\n", CyclePath); 401 } 402 } else { 403 sync_tid = mrec->update.tid; 404 if (CyclePath) { 405 hammer_key_beg_init(&mirror.key_beg); 406 hammer_set_cycle(&mirror.key_beg, sync_tid); 407 fprintf(stderr, 408 "Cyclefile %s updated to 0x%016jx\n", 409 CyclePath, (uintmax_t)sync_tid); 410 } 411 } 412 free(mrec); 413 } else if (CyclePath) { 414 /* NOTE! mirror.tid_beg cannot be updated */ 415 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 416 "fully updated unless you use mirror-copy\n"); 417 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 418 } 419 if (streaming && interrupted == 0) { 420 time_t t1 = time(NULL); 421 time_t t2; 422 423 /* 424 * Try to break down large bulk transfers into smaller ones 425 * so it can sync the transaction id on the slave. This 426 * way if we get interrupted a restart doesn't have to 427 * start from scratch. 428 */ 429 if (streaming && histogram) { 430 if (histindex != histmax) { 431 if (VerboseOpt && VerboseOpt < 2 && 432 streaming >= 0) { 433 fprintf(stderr, " (bulk incremental)"); 434 } 435 relpfs(fd, &pfs); 436 goto again; 437 } 438 } 439 440 if (VerboseOpt && streaming >= 0) { 441 fprintf(stderr, " W"); 442 fflush(stderr); 443 } else if (streaming >= 0) { 444 score_printf(LINE1, "Waiting"); 445 } 446 pfs.ondisk->sync_end_tid = mirror.tid_end; 447 if (streaming < 0) { 448 /* 449 * Fake streaming mode when using a histogram to 450 * break up a mirror-read, do not wait on source. 451 */ 452 streaming = 0; 453 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 454 score_printf(LINE3, 455 "Mirror-read %s: cannot stream: %s\n", 456 filesystem, strerror(errno)); 457 fprintf(stderr, 458 "Mirror-read %s: cannot stream: %s\n", 459 filesystem, strerror(errno)); 460 } else { 461 t2 = time(NULL) - t1; 462 if (t2 >= 0 && t2 < DelayOpt) { 463 if (VerboseOpt) { 464 fprintf(stderr, "\bD"); 465 fflush(stderr); 466 } 467 sleep(DelayOpt - t2); 468 } 469 if (VerboseOpt) { 470 fprintf(stderr, "\b "); 471 fflush(stderr); 472 } 473 relpfs(fd, &pfs); 474 goto again; 475 } 476 } 477 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 478 &mrec_tmp, sizeof(mrec_tmp.sync)); 479 relpfs(fd, &pfs); 480 free(buf); 481 free(histogram_ary); 482 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 483 } 484 485 /* 486 * What we are trying to do here is figure out how much data is 487 * going to be sent for the TID range and to break the TID range 488 * down into reasonably-sized slices (from the point of view of 489 * data sent) so a lost connection can restart at a reasonable 490 * place and not all the way back at the beginning. 491 * 492 * An entry's TID serves as the end_tid for the prior entry 493 * So we have to offset the calculation by 1 so that TID falls into 494 * the previous entry when populating entries. 495 * 496 * Because the transaction id space is bursty we need a relatively 497 * large number of buckets (like a million) to do a reasonable job 498 * for things like an initial bulk mirrors on a very large filesystem. 499 */ 500 #define HIST_COUNT (1024 * 1024) 501 502 static int 503 generate_histogram(int fd, const char *filesystem, 504 histogram_t *histogram_ary, 505 struct hammer_ioc_mirror_rw *mirror_base, 506 int *repeatp) 507 { 508 struct hammer_ioc_mirror_rw mirror; 509 union hammer_ioc_mrecord_any *mrec; 510 hammer_tid_t tid_beg; 511 hammer_tid_t tid_end; 512 hammer_tid_t tid; 513 hammer_tid_t tidx; 514 uint64_t *tid_bytes; 515 uint64_t total; 516 uint64_t accum; 517 int chunkno; 518 int i; 519 int res; 520 int off; 521 int len; 522 523 mirror = *mirror_base; 524 tid_beg = mirror.tid_beg; 525 tid_end = mirror.tid_end; 526 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 527 528 if (*histogram_ary == NULL) { 529 *histogram_ary = malloc(sizeof(struct histogram) * 530 (HIST_COUNT + 2)); 531 } 532 if (tid_beg >= tid_end) 533 return(0); 534 535 /* needs 2 extra */ 536 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 537 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 538 539 if (*repeatp == 0) { 540 fprintf(stderr, "Prescan to break up bulk transfer"); 541 if (VerboseOpt > 1) 542 fprintf(stderr, " (%juMB chunks)", 543 (uintmax_t)(SplitupOpt / (1024 * 1024))); 544 fprintf(stderr, "\n"); 545 } 546 547 /* 548 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 549 * 550 * Note: Estimates can be off when the mirror is way behind due 551 * to skips. 552 */ 553 total = 0; 554 accum = 0; 555 chunkno = 0; 556 for (;;) { 557 mirror.count = 0; 558 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 559 err(1, "Mirror-read %s failed", filesystem); 560 /* not reached */ 561 } 562 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 563 errx(1, "Mirror-read %s fatal error %d", 564 filesystem, mirror.head.error); 565 /* not reached */ 566 } 567 for (off = 0; 568 off < mirror.count; 569 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 570 mrec = (void *)((char *)mirror.ubuf + off); 571 572 /* 573 * We only care about general RECs and PASS 574 * records. We ignore SKIPs. 575 */ 576 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 577 case HAMMER_MREC_TYPE_REC: 578 case HAMMER_MREC_TYPE_PASS: 579 break; 580 default: 581 continue; 582 } 583 584 /* 585 * Calculate for two indices, create_tid and 586 * delete_tid. Record data only applies to 587 * the create_tid. 588 * 589 * When tid is exactly on the boundary it really 590 * belongs to the previous entry because scans 591 * are inclusive of the ending entry. 592 */ 593 tid = mrec->rec.leaf.base.delete_tid; 594 if (tid && tid >= tid_beg && tid <= tid_end) { 595 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 596 if (mrec->head.type == 597 HAMMER_MREC_TYPE_REC) { 598 len -= HAMMER_HEAD_DOALIGN( 599 mrec->rec.leaf.data_len); 600 assert(len > 0); 601 } 602 i = (tid - tid_beg) * HIST_COUNT / 603 (tid_end - tid_beg); 604 tidx = tid_beg + i * (tid_end - tid_beg) / 605 HIST_COUNT; 606 if (tid == tidx && i) 607 --i; 608 assert(i >= 0 && i < HIST_COUNT); 609 tid_bytes[i] += len; 610 total += len; 611 accum += len; 612 } 613 614 tid = mrec->rec.leaf.base.create_tid; 615 if (tid && tid >= tid_beg && tid <= tid_end) { 616 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 617 if (mrec->head.type == 618 HAMMER_MREC_TYPE_REC_NODATA) { 619 len += HAMMER_HEAD_DOALIGN( 620 mrec->rec.leaf.data_len); 621 } 622 i = (tid - tid_beg) * HIST_COUNT / 623 (tid_end - tid_beg); 624 tidx = tid_beg + i * (tid_end - tid_beg) / 625 HIST_COUNT; 626 if (tid == tidx && i) 627 --i; 628 assert(i >= 0 && i < HIST_COUNT); 629 tid_bytes[i] += len; 630 total += len; 631 accum += len; 632 } 633 } 634 if (*repeatp == 0 && accum > SplitupOpt) { 635 if (VerboseOpt > 1) { 636 fprintf(stderr, "."); 637 fflush(stderr); 638 } 639 ++chunkno; 640 score_printf(LINE2, "Prescan chunk %d", chunkno); 641 accum = 0; 642 } 643 if (mirror.count == 0) 644 break; 645 mirror.key_beg = mirror.key_cur; 646 } 647 648 /* 649 * Reduce to SplitupOpt (default 4GB) chunks. This code may 650 * use up to two additional elements. Do the array in-place. 651 * 652 * Inefficient degenerate cases can occur if we do not accumulate 653 * at least the requested split amount, so error on the side of 654 * going over a bit. 655 */ 656 res = 0; 657 (*histogram_ary)[res].tid = tid_beg; 658 (*histogram_ary)[res].bytes = tid_bytes[0]; 659 for (i = 1; i < HIST_COUNT; ++i) { 660 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 661 ++res; 662 (*histogram_ary)[res].tid = tid_beg + 663 i * (tid_end - tid_beg) / 664 HIST_COUNT; 665 (*histogram_ary)[res].bytes = 0; 666 667 } 668 (*histogram_ary)[res].bytes += tid_bytes[i]; 669 } 670 ++res; 671 (*histogram_ary)[res].tid = tid_end; 672 (*histogram_ary)[res].bytes = -1; 673 674 if (*repeatp == 0) { 675 if (VerboseOpt > 1) 676 fprintf(stderr, "\n"); /* newline after ... */ 677 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 678 res, (uintmax_t)total / (1024 * 1024)); 679 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 680 res, (uintmax_t)total / (1024 * 1024)); 681 for (i = 0; i < res && i < 3; ++i) { 682 if (i) 683 fprintf(stderr, ", "); 684 fprintf(stderr, "%ju", 685 (uintmax_t)(*histogram_ary)[i].bytes); 686 } 687 if (i < res) 688 fprintf(stderr, ", ..."); 689 fprintf(stderr, ")\n"); 690 } 691 assert(res <= HIST_COUNT); 692 *repeatp = 1; 693 694 free(tid_bytes); 695 return(res); 696 } 697 698 static void 699 create_pfs(const char *filesystem, uuid_t *s_uuid) 700 { 701 if (ForceYesOpt == 1) { 702 fprintf(stderr, "PFS slave %s does not exist. " 703 "Auto create new slave PFS!\n", filesystem); 704 705 } else { 706 fprintf(stderr, "PFS slave %s does not exist.\n" 707 "Do you want to create a new slave PFS? [y/n] ", 708 filesystem); 709 fflush(stderr); 710 if (getyntty() != 1) { 711 errx(1, "Aborting operation"); 712 /* not reached */ 713 } 714 } 715 716 uint32_t status; 717 char *shared_uuid = NULL; 718 uuid_to_string(s_uuid, &shared_uuid, &status); 719 720 char *cmd = NULL; 721 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 722 filesystem, shared_uuid); 723 free(shared_uuid); 724 725 if (cmd == NULL) { 726 errx(1, "Failed to alloc memory"); 727 /* not reached */ 728 } 729 if (system(cmd) != 0) 730 fprintf(stderr, "Failed to create PFS\n"); 731 free(cmd); 732 } 733 734 /* 735 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 736 * some additional packet types to negotiate TID ranges and to verify 737 * completion. The HAMMER VFS does most of the work. 738 * 739 * It is important to note that the mirror.key_{beg,end} range must 740 * match the ranged used by the original. For now both sides use 741 * range the entire key space. 742 * 743 * It is even more important that the records in the stream conform 744 * to the TID range also supplied in the stream. The HAMMER VFS will 745 * use the REC, PASS, and SKIP record types to track the portions of 746 * the B-Tree being scanned in order to be able to proactively delete 747 * records on the target within those active areas that are not mentioned 748 * by the source. 749 * 750 * The mirror.key_cur field is used by the VFS to do this tracking. It 751 * must be initialized to key_beg but then is persistently updated by 752 * the HAMMER VFS on each successive ioctl() call. If you blow up this 753 * field you will blow up the mirror target, possibly to the point of 754 * deleting everything. As a safety measure the HAMMER VFS simply marks 755 * the records that the source has destroyed as deleted on the target, 756 * and normal pruning operations will deal with their final disposition 757 * at some later time. 758 */ 759 void 760 hammer_cmd_mirror_write(char **av, int ac) 761 { 762 struct hammer_ioc_mirror_rw mirror; 763 const char *filesystem; 764 char *buf = malloc(SERIALBUF_SIZE); 765 struct hammer_ioc_pseudofs_rw pfs; 766 struct hammer_ioc_mrecord_head pickup; 767 struct hammer_ioc_synctid synctid; 768 union hammer_ioc_mrecord_any mrec_tmp; 769 hammer_ioc_mrecord_any_t mrec; 770 struct stat st; 771 int error; 772 int fd; 773 int n; 774 775 if (ac != 1) { 776 mirror_usage(1); 777 /* not reached */ 778 } 779 filesystem = av[0]; 780 hammer_check_restrict(filesystem); 781 782 pickup.signature = 0; 783 pickup.type = 0; 784 785 again: 786 bzero(&mirror, sizeof(mirror)); 787 hammer_key_beg_init(&mirror.key_beg); 788 hammer_key_end_init(&mirror.key_end); 789 mirror.key_end = mirror.key_beg; 790 791 /* 792 * Read initial packet 793 */ 794 mrec = read_mrecord(0, &error, &pickup); 795 if (mrec == NULL) { 796 if (error == 0) { 797 errx(1, "validate_mrec_header: short read"); 798 /* not reached */ 799 } 800 exit(1); 801 } 802 /* 803 * Validate packet 804 */ 805 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 806 free(buf); 807 return; 808 } 809 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 810 errx(1, "validate_mrec_header: did not get expected " 811 "PFSD record type"); 812 /* not reached */ 813 } 814 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 815 errx(1, "validate_mrec_header: unexpected payload size"); 816 /* not reached */ 817 } 818 819 /* 820 * Create slave PFS if it doesn't yet exist 821 */ 822 if (lstat(filesystem, &st) != 0) 823 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 824 free(mrec); 825 mrec = NULL; 826 827 fd = getpfs(&pfs, filesystem); 828 829 /* 830 * In two-way mode the target writes out a PFS packet first. 831 * The source uses our tid_end as its tid_beg by default, 832 * picking up where it left off. 833 */ 834 mirror.tid_beg = 0; 835 if (TwoWayPipeOpt) { 836 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 837 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 838 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 839 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 840 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 841 &mrec_tmp, sizeof(mrec_tmp.pfs)); 842 } 843 844 /* 845 * Read and process the PFS header. The source informs us of 846 * the TID range the stream represents. 847 */ 848 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 849 &mirror.tid_beg, &mirror.tid_end); 850 if (n < 0) { /* got TERM record */ 851 relpfs(fd, &pfs); 852 free(buf); 853 return; 854 } 855 856 mirror.ubuf = buf; 857 mirror.size = SERIALBUF_SIZE; 858 859 /* 860 * Read and process bulk records (REC, PASS, and SKIP types). 861 * 862 * On your life, do NOT mess with mirror.key_cur or your mirror 863 * target may become history. 864 */ 865 for (;;) { 866 mirror.count = 0; 867 mirror.pfs_id = pfs.pfs_id; 868 mirror.shared_uuid = pfs.ondisk->shared_uuid; 869 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 870 if (mirror.size <= 0) 871 break; 872 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 873 err(1, "Mirror-write %s failed", filesystem); 874 /* not reached */ 875 } 876 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 877 errx(1, "Mirror-write %s fatal error %d", 878 filesystem, mirror.head.error); 879 /* not reached */ 880 } 881 #if 0 882 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 883 errx(1, "Mirror-write %s interrupted by timer at" 884 " %016llx", 885 filesystem, 886 mirror.key_cur.obj_id); 887 /* not reached */ 888 } 889 #endif 890 } 891 892 /* 893 * Read and process the termination sync record. 894 */ 895 mrec = read_mrecord(0, &error, &pickup); 896 897 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 898 fprintf(stderr, "Mirror-write: received termination request\n"); 899 relpfs(fd, &pfs); 900 free(mrec); 901 free(buf); 902 return; 903 } 904 905 if (mrec == NULL || 906 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 907 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 908 mrec->head.rec_size != sizeof(mrec->sync)) { 909 errx(1, "Mirror-write %s: Did not get termination " 910 "sync record, or rec_size is wrong rt=%d", 911 filesystem, (mrec ? (int)mrec->head.type : -1)); 912 /* not reached */ 913 } 914 915 /* 916 * Update the PFS info on the target so the user has visibility 917 * into the new snapshot, and sync the target filesystem. 918 */ 919 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 920 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 921 922 bzero(&synctid, sizeof(synctid)); 923 synctid.op = HAMMER_SYNCTID_SYNC2; 924 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 925 926 if (VerboseOpt >= 2) { 927 fprintf(stderr, "Mirror-write %s: succeeded\n", 928 filesystem); 929 } 930 } 931 932 free(mrec); 933 mrec = NULL; 934 935 /* 936 * Report back to the originator. 937 */ 938 if (TwoWayPipeOpt) { 939 mrec_tmp.update.tid = mirror.tid_end; 940 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 941 &mrec_tmp, sizeof(mrec_tmp.update)); 942 } else { 943 printf("Source can update synctid to 0x%016jx\n", 944 (uintmax_t)mirror.tid_end); 945 } 946 relpfs(fd, &pfs); 947 goto again; 948 } 949 950 void 951 hammer_cmd_mirror_dump(char **av, int ac) 952 { 953 char *buf = malloc(SERIALBUF_SIZE); 954 struct hammer_ioc_mrecord_head pickup; 955 hammer_ioc_mrecord_any_t mrec; 956 int error; 957 int size; 958 int offset; 959 int bytes; 960 int header_only = 0; 961 962 if (ac == 1 && strcmp(*av, "header") == 0) { 963 header_only = 1; 964 } else if (ac != 0) { 965 mirror_usage(1); 966 /* not reached */ 967 } 968 969 /* 970 * Read and process the PFS header 971 */ 972 pickup.signature = 0; 973 pickup.type = 0; 974 975 mrec = read_mrecord(0, &error, &pickup); 976 977 /* 978 * Dump the PFS header. mirror-dump takes its input from the output 979 * of a mirror-read so getpfs() can't be used to get a fd to be passed 980 * to dump_pfsd(). 981 */ 982 if (header_only && mrec != NULL) { 983 dump_pfsd(&mrec->pfs.pfsd, -1); 984 free(mrec); 985 free(buf); 986 return; 987 } 988 free(mrec); 989 990 again: 991 /* 992 * Read and process bulk records 993 */ 994 for (;;) { 995 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 996 if (size <= 0) 997 break; 998 offset = 0; 999 while (offset < size) { 1000 mrec = (void *)((char *)buf + offset); 1001 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1002 if (offset + bytes > size) { 1003 errx(1, "Misaligned record"); 1004 /* not reached */ 1005 } 1006 1007 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1008 case HAMMER_MREC_TYPE_REC_BADCRC: 1009 case HAMMER_MREC_TYPE_REC: 1010 printf("Record lo=%08x obj=%016jx key=%016jx " 1011 "rt=%02x ot=%02x", 1012 mrec->rec.leaf.base.localization, 1013 (uintmax_t)mrec->rec.leaf.base.obj_id, 1014 (uintmax_t)mrec->rec.leaf.base.key, 1015 mrec->rec.leaf.base.rec_type, 1016 mrec->rec.leaf.base.obj_type); 1017 if (mrec->head.type == 1018 HAMMER_MREC_TYPE_REC_BADCRC) { 1019 printf(" (BAD CRC)"); 1020 } 1021 printf("\n"); 1022 printf(" tids %016jx:%016jx data=%d\n", 1023 (uintmax_t)mrec->rec.leaf.base.create_tid, 1024 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1025 mrec->rec.leaf.data_len); 1026 break; 1027 case HAMMER_MREC_TYPE_PASS: 1028 printf("Pass lo=%08x obj=%016jx key=%016jx " 1029 "rt=%02x ot=%02x\n", 1030 mrec->rec.leaf.base.localization, 1031 (uintmax_t)mrec->rec.leaf.base.obj_id, 1032 (uintmax_t)mrec->rec.leaf.base.key, 1033 mrec->rec.leaf.base.rec_type, 1034 mrec->rec.leaf.base.obj_type); 1035 printf(" tids %016jx:%016jx data=%d\n", 1036 (uintmax_t)mrec->rec.leaf.base.create_tid, 1037 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1038 mrec->rec.leaf.data_len); 1039 break; 1040 case HAMMER_MREC_TYPE_SKIP: 1041 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1042 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1043 mrec->skip.skip_beg.localization, 1044 (uintmax_t)mrec->skip.skip_beg.obj_id, 1045 (uintmax_t)mrec->skip.skip_beg.key, 1046 mrec->skip.skip_beg.rec_type, 1047 mrec->skip.skip_end.localization, 1048 (uintmax_t)mrec->skip.skip_end.obj_id, 1049 (uintmax_t)mrec->skip.skip_end.key, 1050 mrec->skip.skip_end.rec_type); 1051 default: 1052 break; 1053 } 1054 offset += bytes; 1055 } 1056 } 1057 1058 /* 1059 * Read and process the termination sync record. 1060 */ 1061 mrec = read_mrecord(0, &error, &pickup); 1062 if (mrec == NULL || 1063 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1064 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1065 fprintf(stderr, "Mirror-dump: Did not get termination " 1066 "sync record\n"); 1067 } 1068 free(mrec); 1069 1070 /* 1071 * Continue with more batches until EOF. 1072 */ 1073 mrec = read_mrecord(0, &error, &pickup); 1074 if (mrec) { 1075 free(mrec); 1076 goto again; 1077 } 1078 free(buf); 1079 } 1080 1081 void 1082 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1083 { 1084 pid_t pid1; 1085 pid_t pid2; 1086 int fds[2]; 1087 const char *xav[32]; 1088 char tbuf[16]; 1089 char *sh, *user, *host, *rfs; 1090 int xac; 1091 1092 if (ac != 2) { 1093 mirror_usage(1); 1094 /* not reached */ 1095 } 1096 1097 TwoWayPipeOpt = 1; 1098 signal(SIGPIPE, SIG_IGN); 1099 1100 again: 1101 if (pipe(fds) < 0) { 1102 err(1, "pipe"); 1103 /* not reached */ 1104 } 1105 1106 /* 1107 * Source 1108 */ 1109 if ((pid1 = fork()) == 0) { 1110 signal(SIGPIPE, SIG_DFL); 1111 dup2(fds[0], 0); 1112 dup2(fds[0], 1); 1113 close(fds[0]); 1114 close(fds[1]); 1115 if ((rfs = strchr(av[0], ':')) != NULL) { 1116 xac = 0; 1117 1118 if((sh = getenv("HAMMER_RSH")) == NULL) 1119 xav[xac++] = "ssh"; 1120 else 1121 xav[xac++] = sh; 1122 1123 if (CompressOpt) 1124 xav[xac++] = "-C"; 1125 1126 if ((host = strchr(av[0], '@')) != NULL) { 1127 user = strndup( av[0], (host++ - av[0])); 1128 host = strndup( host, (rfs++ - host)); 1129 xav[xac++] = "-l"; 1130 xav[xac++] = user; 1131 xav[xac++] = host; 1132 } else { 1133 host = strndup( av[0], (rfs++ - av[0])); 1134 user = NULL; 1135 xav[xac++] = host; 1136 } 1137 1138 1139 if (SshPort) { 1140 xav[xac++] = "-p"; 1141 xav[xac++] = SshPort; 1142 } 1143 1144 xav[xac++] = "hammer"; 1145 1146 switch(VerboseOpt) { 1147 case 0: 1148 break; 1149 case 1: 1150 xav[xac++] = "-v"; 1151 break; 1152 case 2: 1153 xav[xac++] = "-vv"; 1154 break; 1155 default: 1156 xav[xac++] = "-vvv"; 1157 break; 1158 } 1159 if (ForceYesOpt) 1160 xav[xac++] = "-y"; 1161 xav[xac++] = "-2"; 1162 if (TimeoutOpt) { 1163 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1164 xav[xac++] = "-t"; 1165 xav[xac++] = tbuf; 1166 } 1167 if (SplitupOptStr) { 1168 xav[xac++] = "-S"; 1169 xav[xac++] = SplitupOptStr; 1170 } 1171 if (streaming) 1172 xav[xac++] = "mirror-read-stream"; 1173 else 1174 xav[xac++] = "mirror-read"; 1175 xav[xac++] = rfs; 1176 xav[xac++] = NULL; 1177 execvp(*xav, (void *)xav); 1178 } else { 1179 hammer_cmd_mirror_read(av, 1, streaming); 1180 fflush(stdout); 1181 fflush(stderr); 1182 } 1183 _exit(1); 1184 } 1185 1186 /* 1187 * Target 1188 */ 1189 if ((pid2 = fork()) == 0) { 1190 signal(SIGPIPE, SIG_DFL); 1191 dup2(fds[1], 0); 1192 dup2(fds[1], 1); 1193 close(fds[0]); 1194 close(fds[1]); 1195 if ((rfs = strchr(av[1], ':')) != NULL) { 1196 xac = 0; 1197 1198 if((sh = getenv("HAMMER_RSH")) == NULL) 1199 xav[xac++] = "ssh"; 1200 else 1201 xav[xac++] = sh; 1202 1203 if (CompressOpt) 1204 xav[xac++] = "-C"; 1205 1206 if ((host = strchr(av[1], '@')) != NULL) { 1207 user = strndup( av[1], (host++ - av[1])); 1208 host = strndup( host, (rfs++ - host)); 1209 xav[xac++] = "-l"; 1210 xav[xac++] = user; 1211 xav[xac++] = host; 1212 } else { 1213 host = strndup( av[1], (rfs++ - av[1])); 1214 user = NULL; 1215 xav[xac++] = host; 1216 } 1217 1218 if (SshPort) { 1219 xav[xac++] = "-p"; 1220 xav[xac++] = SshPort; 1221 } 1222 1223 xav[xac++] = "hammer"; 1224 1225 switch(VerboseOpt) { 1226 case 0: 1227 break; 1228 case 1: 1229 xav[xac++] = "-v"; 1230 break; 1231 case 2: 1232 xav[xac++] = "-vv"; 1233 break; 1234 default: 1235 xav[xac++] = "-vvv"; 1236 break; 1237 } 1238 if (ForceYesOpt) 1239 xav[xac++] = "-y"; 1240 xav[xac++] = "-2"; 1241 xav[xac++] = "mirror-write"; 1242 xav[xac++] = rfs; 1243 xav[xac++] = NULL; 1244 execvp(*xav, (void *)xav); 1245 } else { 1246 hammer_cmd_mirror_write(av + 1, 1); 1247 fflush(stdout); 1248 fflush(stderr); 1249 } 1250 _exit(1); 1251 } 1252 close(fds[0]); 1253 close(fds[1]); 1254 1255 while (waitpid(pid1, NULL, 0) <= 0) 1256 ; 1257 while (waitpid(pid2, NULL, 0) <= 0) 1258 ; 1259 1260 /* 1261 * If the link is lost restart 1262 */ 1263 if (streaming) { 1264 if (VerboseOpt) { 1265 fprintf(stderr, "\nLost Link\n"); 1266 fflush(stderr); 1267 } 1268 sleep(15 + DelayOpt); 1269 goto again; 1270 } 1271 1272 } 1273 1274 /* 1275 * Read and return multiple mrecords 1276 */ 1277 static int 1278 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1279 { 1280 hammer_ioc_mrecord_any_t mrec; 1281 u_int count; 1282 size_t n; 1283 size_t i; 1284 size_t bytes; 1285 int type; 1286 1287 count = 0; 1288 while (size - count >= HAMMER_MREC_HEADSIZE) { 1289 /* 1290 * Cached the record header in case we run out of buffer 1291 * space. 1292 */ 1293 fflush(stdout); 1294 if (pickup->signature == 0) { 1295 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1296 i = read(fd, (char *)pickup + n, 1297 HAMMER_MREC_HEADSIZE - n); 1298 if (i <= 0) 1299 break; 1300 } 1301 if (n == 0) 1302 break; 1303 if (n != HAMMER_MREC_HEADSIZE) { 1304 errx(1, "read_mrecords: short read on pipe"); 1305 /* not reached */ 1306 } 1307 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1308 errx(1, "read_mrecords: malformed record on pipe, " 1309 "bad signature"); 1310 /* not reached */ 1311 } 1312 } 1313 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1314 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1315 errx(1, "read_mrecords: malformed record on pipe, " 1316 "illegal rec_size"); 1317 /* not reached */ 1318 } 1319 1320 /* 1321 * Stop if we have insufficient space for the record and data. 1322 */ 1323 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1324 if (size - count < bytes) 1325 break; 1326 1327 /* 1328 * Stop if the record type is not a REC, SKIP, or PASS, 1329 * which are the only types the ioctl supports. Other types 1330 * are used only by the userland protocol. 1331 * 1332 * Ignore all flags. 1333 */ 1334 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1335 if (type != HAMMER_MREC_TYPE_PFSD && 1336 type != HAMMER_MREC_TYPE_REC && 1337 type != HAMMER_MREC_TYPE_SKIP && 1338 type != HAMMER_MREC_TYPE_PASS) { 1339 break; 1340 } 1341 1342 /* 1343 * Read the remainder and clear the pickup signature. 1344 */ 1345 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1346 i = read(fd, buf + count + n, bytes - n); 1347 if (i <= 0) 1348 break; 1349 } 1350 if (n != bytes) { 1351 errx(1, "read_mrecords: short read on pipe"); 1352 /* not reached */ 1353 } 1354 1355 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1356 pickup->signature = 0; 1357 pickup->type = 0; 1358 mrec = (void *)(buf + count); 1359 1360 /* 1361 * Validate the completed record 1362 */ 1363 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1364 errx(1, "read_mrecords: malformed record on pipe, bad crc"); 1365 /* not reached */ 1366 } 1367 1368 /* 1369 * If its a B-Tree record validate the data crc. 1370 * 1371 * NOTE: If the VFS passes us an explicitly errorde mrec 1372 * we just pass it through. 1373 */ 1374 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1375 1376 if (type == HAMMER_MREC_TYPE_REC) { 1377 if (mrec->head.rec_size < 1378 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1379 errx(1, "read_mrecords: malformed record on " 1380 "pipe, illegal element data_len"); 1381 /* not reached */ 1382 } 1383 if (mrec->rec.leaf.data_len && 1384 mrec->rec.leaf.data_offset && 1385 hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) { 1386 fprintf(stderr, 1387 "read_mrecords: data_crc did not " 1388 "match data! obj=%016jx key=%016jx\n", 1389 (uintmax_t)mrec->rec.leaf.base.obj_id, 1390 (uintmax_t)mrec->rec.leaf.base.key); 1391 fprintf(stderr, 1392 "continuing, but there are problems\n"); 1393 } 1394 } 1395 count += bytes; 1396 } 1397 return(count); 1398 } 1399 1400 /* 1401 * Read and return a single mrecord. 1402 */ 1403 static 1404 hammer_ioc_mrecord_any_t 1405 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1406 { 1407 hammer_ioc_mrecord_any_t mrec; 1408 struct hammer_ioc_mrecord_head mrechd; 1409 size_t bytes; 1410 size_t n; 1411 size_t i; 1412 1413 if (pickup && pickup->type != 0) { 1414 mrechd = *pickup; 1415 pickup->signature = 0; 1416 pickup->type = 0; 1417 n = HAMMER_MREC_HEADSIZE; 1418 } else { 1419 /* 1420 * Read in the PFSD header from the sender. 1421 */ 1422 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1423 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1424 if (i <= 0) 1425 break; 1426 } 1427 if (n == 0) { 1428 *errorp = 0; /* EOF */ 1429 return(NULL); 1430 } 1431 if (n != HAMMER_MREC_HEADSIZE) { 1432 fprintf(stderr, "short read of mrecord header\n"); 1433 *errorp = EPIPE; 1434 return(NULL); 1435 } 1436 } 1437 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1438 fprintf(stderr, "read_mrecord: bad signature\n"); 1439 *errorp = EINVAL; 1440 return(NULL); 1441 } 1442 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1443 assert(bytes >= sizeof(mrechd)); 1444 mrec = malloc(bytes); 1445 mrec->head = mrechd; 1446 1447 while (n < bytes) { 1448 i = read(fdin, (char *)mrec + n, bytes - n); 1449 if (i <= 0) 1450 break; 1451 n += i; 1452 } 1453 if (n != bytes) { 1454 fprintf(stderr, "read_mrecord: short read on payload\n"); 1455 *errorp = EPIPE; 1456 return(NULL); 1457 } 1458 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) { 1459 fprintf(stderr, "read_mrecord: bad CRC\n"); 1460 *errorp = EINVAL; 1461 return(NULL); 1462 } 1463 *errorp = 0; 1464 return(mrec); 1465 } 1466 1467 static 1468 void 1469 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1470 int bytes) 1471 { 1472 char zbuf[HAMMER_HEAD_ALIGN]; 1473 int pad; 1474 1475 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1476 1477 assert(bytes >= (int)sizeof(mrec->head)); 1478 bzero(&mrec->head, sizeof(mrec->head)); 1479 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1480 mrec->head.type = type; 1481 mrec->head.rec_size = bytes; 1482 hammer_crc_set_mrec_head(&mrec->head, bytes); 1483 if (write(fdout, mrec, bytes) != bytes) { 1484 err(1, "write_mrecord"); 1485 /* not reached */ 1486 } 1487 if (pad) { 1488 bzero(zbuf, pad); 1489 if (write(fdout, zbuf, pad) != pad) { 1490 err(1, "write_mrecord"); 1491 /* not reached */ 1492 } 1493 } 1494 } 1495 1496 /* 1497 * Generate a mirroring header with the pfs information of the 1498 * originating filesytem. 1499 */ 1500 static void 1501 generate_mrec_header(int fd, int pfs_id, 1502 union hammer_ioc_mrecord_any *mrec_tmp) 1503 { 1504 struct hammer_ioc_pseudofs_rw pfs; 1505 1506 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1507 clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id); 1508 1509 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1510 err(1, "Mirror-read: not a HAMMER fs/pseudofs!"); 1511 /* not reached */ 1512 } 1513 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1514 errx(1, "Mirror-read: HAMMER PFS version mismatch!"); 1515 /* not reached */ 1516 } 1517 mrec_tmp->pfs.version = pfs.version; 1518 } 1519 1520 /* 1521 * Validate the pfs information from the originating filesystem 1522 * against the target filesystem. shared_uuid must match. 1523 * 1524 * return -1 if we got a TERM record 1525 */ 1526 static int 1527 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1528 struct hammer_ioc_mrecord_head *pickup, 1529 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1530 { 1531 struct hammer_ioc_pseudofs_rw pfs; 1532 struct hammer_pseudofs_data pfsd; 1533 hammer_ioc_mrecord_any_t mrec; 1534 int error; 1535 1536 /* 1537 * Get the PFSD info from the target filesystem. 1538 */ 1539 clrpfs(&pfs, &pfsd, pfs_id); 1540 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1541 err(1, "mirror-write: not a HAMMER fs/pseudofs!"); 1542 /* not reached */ 1543 } 1544 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1545 errx(1, "mirror-write: HAMMER PFS version mismatch!"); 1546 /* not reached */ 1547 } 1548 1549 mrec = read_mrecord(fdin, &error, pickup); 1550 if (mrec == NULL) { 1551 if (error == 0) { 1552 errx(1, "validate_mrec_header: short read"); 1553 /* not reached */ 1554 } 1555 exit(1); 1556 } 1557 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1558 free(mrec); 1559 return(-1); 1560 } 1561 1562 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1563 errx(1, "validate_mrec_header: did not get expected " 1564 "PFSD record type"); 1565 /* not reached */ 1566 } 1567 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1568 errx(1, "validate_mrec_header: unexpected payload size"); 1569 /* not reached */ 1570 } 1571 if (mrec->pfs.version != pfs.version) { 1572 errx(1, "validate_mrec_header: Version mismatch"); 1573 /* not reached */ 1574 } 1575 1576 /* 1577 * Whew. Ok, is the read PFS info compatible with the target? 1578 */ 1579 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1580 sizeof(pfsd.shared_uuid)) != 0) { 1581 errx(1, "mirror-write: source and target have " 1582 "different shared-uuid's!"); 1583 /* not reached */ 1584 } 1585 if (is_target && hammer_is_pfs_master(&pfsd)) { 1586 errx(1, "mirror-write: target must be in slave mode"); 1587 /* not reached */ 1588 } 1589 if (tid_begp) 1590 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1591 if (tid_endp) 1592 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1593 free(mrec); 1594 return(0); 1595 } 1596 1597 static void 1598 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1599 { 1600 struct hammer_ioc_pseudofs_rw pfs; 1601 struct hammer_pseudofs_data pfsd; 1602 1603 clrpfs(&pfs, &pfsd, pfs_id); 1604 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1605 err(1, "update_pfs_snapshot (read)"); 1606 /* not reached */ 1607 } 1608 1609 if (pfsd.sync_end_tid != snapshot_tid) { 1610 pfsd.sync_end_tid = snapshot_tid; 1611 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1612 err(1, "update_pfs_snapshot (rewrite)"); 1613 /* not reached */ 1614 } 1615 if (VerboseOpt >= 2) { 1616 fprintf(stderr, 1617 "Mirror-write: Completed, updated snapshot " 1618 "to %016jx\n", 1619 (uintmax_t)snapshot_tid); 1620 fflush(stderr); 1621 } 1622 } 1623 } 1624 1625 /* 1626 * Bandwidth-limited write in chunks 1627 */ 1628 static 1629 ssize_t 1630 writebw(int fd, const void *buf, size_t nbytes, 1631 uint64_t *bwcount, struct timeval *tv1) 1632 { 1633 struct timeval tv2; 1634 size_t n; 1635 ssize_t r; 1636 ssize_t a; 1637 int usec; 1638 1639 a = 0; 1640 r = 0; 1641 while (nbytes) { 1642 if (*bwcount + nbytes > BandwidthOpt) 1643 n = BandwidthOpt - *bwcount; 1644 else 1645 n = nbytes; 1646 if (n) 1647 r = write(fd, buf, n); 1648 if (r >= 0) { 1649 a += r; 1650 nbytes -= r; 1651 buf = (const char *)buf + r; 1652 } 1653 if ((size_t)r != n) 1654 break; 1655 *bwcount += n; 1656 if (*bwcount >= BandwidthOpt) { 1657 gettimeofday(&tv2, NULL); 1658 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1659 (int)(tv2.tv_usec - tv1->tv_usec); 1660 if (usec >= 0 && usec < 1000000) 1661 usleep(1000000 - usec); 1662 gettimeofday(tv1, NULL); 1663 *bwcount -= BandwidthOpt; 1664 } 1665 } 1666 return(a ? a : r); 1667 } 1668 1669 /* 1670 * Get a yes or no answer from the terminal. The program may be run as 1671 * part of a two-way pipe so we cannot use stdin for this operation. 1672 */ 1673 static int 1674 getyntty(void) 1675 { 1676 char buf[256]; 1677 FILE *fp; 1678 int result; 1679 1680 fp = fopen("/dev/tty", "r"); 1681 if (fp == NULL) { 1682 fprintf(stderr, "No terminal for response\n"); 1683 return(-1); 1684 } 1685 result = -1; 1686 while (fgets(buf, sizeof(buf), fp) != NULL) { 1687 if (buf[0] == 'y' || buf[0] == 'Y') { 1688 result = 1; 1689 break; 1690 } 1691 if (buf[0] == 'n' || buf[0] == 'N') { 1692 result = 0; 1693 break; 1694 } 1695 fprintf(stderr, "Response not understood\n"); 1696 break; 1697 } 1698 fclose(fp); 1699 return(result); 1700 } 1701 1702 static void 1703 score_printf(size_t i, size_t w, const char *ctl, ...) 1704 { 1705 va_list va; 1706 size_t n; 1707 static size_t SSize; 1708 static int SFd = -1; 1709 static char ScoreBuf[1024]; 1710 1711 if (ScoreBoardFile == NULL) 1712 return; 1713 assert(i + w < sizeof(ScoreBuf)); 1714 if (SFd < 0) { 1715 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1716 if (SFd < 0) 1717 return; 1718 SSize = 0; 1719 } 1720 for (n = 0; n < i; ++n) { 1721 if (ScoreBuf[n] == 0) 1722 ScoreBuf[n] = ' '; 1723 } 1724 va_start(va, ctl); 1725 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1726 va_end(va); 1727 n = strlen(ScoreBuf + i); 1728 while (n < w - 1) { 1729 ScoreBuf[i + n] = ' '; 1730 ++n; 1731 } 1732 ScoreBuf[i + n] = '\n'; 1733 if (SSize < i + w) 1734 SSize = i + w; 1735 pwrite(SFd, ScoreBuf, SSize, 0); 1736 } 1737 1738 static void 1739 hammer_check_restrict(const char *filesystem) 1740 { 1741 size_t rlen; 1742 int atslash; 1743 1744 if (RestrictTarget == NULL) 1745 return; 1746 rlen = strlen(RestrictTarget); 1747 if (strncmp(filesystem, RestrictTarget, rlen) != 0) { 1748 errx(1, "hammer-remote: restricted target"); 1749 /* not reached */ 1750 } 1751 1752 atslash = 1; 1753 while (filesystem[rlen]) { 1754 if (atslash && 1755 filesystem[rlen] == '.' && 1756 filesystem[rlen+1] == '.') { 1757 errx(1, "hammer-remote: '..' not allowed"); 1758 /* not reached */ 1759 } 1760 if (filesystem[rlen] == '/') 1761 atslash = 1; 1762 else 1763 atslash = 0; 1764 ++rlen; 1765 } 1766 } 1767 1768 static void 1769 mirror_usage(int code) 1770 { 1771 fprintf(stderr, 1772 "hammer mirror-read <filesystem> [begin-tid]\n" 1773 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1774 "hammer mirror-write <filesystem>\n" 1775 "hammer mirror-dump [header]\n" 1776 "hammer mirror-copy [[user@]host:]<filesystem>" 1777 " [[user@]host:]<filesystem>\n" 1778 "hammer mirror-stream [[user@]host:]<filesystem>" 1779 " [[user@]host:]<filesystem>\n" 1780 ); 1781 exit(code); 1782 } 1783 1784