1 /* 2 * Copyright (c) 2008 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 */ 34 35 #include "hammer.h" 36 37 #define LINE1 0,20 38 #define LINE2 20,78 39 #define LINE3 90,70 40 41 #define SERIALBUF_SIZE (512 * 1024) 42 43 typedef struct histogram { 44 hammer_tid_t tid; 45 uint64_t bytes; 46 } *histogram_t; 47 48 const char *ScoreBoardFile; 49 const char *RestrictTarget; 50 51 static int read_mrecords(int fd, char *buf, u_int size, 52 hammer_ioc_mrecord_head_t pickup); 53 static int generate_histogram(int fd, const char *filesystem, 54 histogram_t *histogram_ary, 55 struct hammer_ioc_mirror_rw *mirror_base, 56 int *repeatp); 57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp, 58 hammer_ioc_mrecord_head_t pickup); 59 static void write_mrecord(int fdout, uint32_t type, 60 hammer_ioc_mrecord_any_t mrec, int bytes); 61 static void generate_mrec_header(int fd, int pfs_id, 62 union hammer_ioc_mrecord_any *mrec_tmp); 63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 64 struct hammer_ioc_mrecord_head *pickup, 65 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp); 66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id); 67 static ssize_t writebw(int fd, const void *buf, size_t nbytes, 68 uint64_t *bwcount, struct timeval *tv1); 69 static int getyntty(void); 70 static void score_printf(size_t i, size_t w, const char *ctl, ...); 71 static void hammer_check_restrict(const char *filesystem); 72 static void mirror_usage(int code); 73 74 /* 75 * Generate a mirroring data stream from the specific source over the 76 * entire key range, but restricted to the specified transaction range. 77 * 78 * The HAMMER VFS does most of the work, we add a few new mrecord 79 * types to negotiate the TID ranges and verify that the entire 80 * stream made it to the destination. 81 * 82 * streaming will be 0 for mirror-read, 1 for mirror-stream. The code will 83 * set up a fake value of -1 when running the histogram for mirror-read. 84 */ 85 void 86 hammer_cmd_mirror_read(char **av, int ac, int streaming) 87 { 88 struct hammer_ioc_mirror_rw mirror; 89 struct hammer_ioc_pseudofs_rw pfs; 90 union hammer_ioc_mrecord_any mrec_tmp; 91 struct hammer_ioc_mrecord_head pickup; 92 hammer_ioc_mrecord_any_t mrec; 93 hammer_tid_t sync_tid; 94 histogram_t histogram_ary; 95 char *filesystem; 96 char *buf = malloc(SERIALBUF_SIZE); 97 int interrupted = 0; 98 int error; 99 int fd; 100 int n; 101 int didwork; 102 int histogram; 103 int histindex; 104 int histmax; 105 int repeat = 0; 106 int sameline; 107 int64_t total_bytes; 108 time_t base_t = time(NULL); 109 struct timeval bwtv; 110 uint64_t bwcount; 111 uint64_t estbytes; 112 113 if (ac == 0 || ac > 2) 114 mirror_usage(1); 115 filesystem = av[0]; 116 hammer_check_restrict(filesystem); 117 118 pickup.signature = 0; 119 pickup.type = 0; 120 histogram = 0; 121 histindex = 0; 122 histmax = 0; 123 histogram_ary = NULL; 124 sameline = 0; 125 126 again: 127 bzero(&mirror, sizeof(mirror)); 128 hammer_key_beg_init(&mirror.key_beg); 129 hammer_key_end_init(&mirror.key_end); 130 131 fd = getpfs(&pfs, filesystem); 132 133 if (streaming >= 0) 134 score_printf(LINE1, "Running"); 135 136 if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) { 137 fprintf(stderr, "%cRunning \b\b", (sameline ? '\r' : '\n')); 138 fflush(stderr); 139 sameline = 1; 140 } 141 sameline = 1; 142 total_bytes = 0; 143 gettimeofday(&bwtv, NULL); 144 bwcount = 0; 145 146 /* 147 * Send initial header for the purpose of determining the 148 * shared-uuid. 149 */ 150 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 151 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 152 &mrec_tmp, sizeof(mrec_tmp.pfs)); 153 154 /* 155 * In 2-way mode the target will send us a PFS info packet 156 * first. Use the target's current snapshot TID as our default 157 * begin TID. 158 */ 159 if (TwoWayPipeOpt) { 160 mirror.tid_beg = 0; 161 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup, 162 NULL, &mirror.tid_beg); 163 if (n < 0) { /* got TERM record */ 164 relpfs(fd, &pfs); 165 free(buf); 166 free(histogram_ary); 167 return; 168 } 169 ++mirror.tid_beg; 170 } else if (streaming && histogram) { 171 mirror.tid_beg = histogram_ary[histindex].tid + 1; 172 } else { 173 mirror.tid_beg = 0; 174 } 175 176 /* 177 * Write out the PFS header, tid_beg will be updated if our PFS 178 * has a larger begin sync. tid_end is set to the latest source 179 * TID whos flush cycle has completed. 180 */ 181 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 182 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 183 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 184 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 185 mirror.ubuf = buf; 186 mirror.size = SERIALBUF_SIZE; 187 mirror.pfs_id = pfs.pfs_id; 188 mirror.shared_uuid = pfs.ondisk->shared_uuid; 189 190 /* 191 * XXX If the histogram is exhausted and the TID delta is large 192 * the stream might have been offline for a while and is 193 * now picking it up again. Do another histogram. 194 */ 195 #if 0 196 if (streaming && histogram && histindex == histend) { 197 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM) 198 histogram = 0; 199 } 200 #endif 201 202 /* 203 * Initial bulk startup control, try to do some incremental 204 * mirroring in order to allow the stream to be killed and 205 * restarted without having to start over. 206 */ 207 if (histogram == 0 && BulkOpt == 0) { 208 if (VerboseOpt && repeat == 0) { 209 fprintf(stderr, "\n"); 210 sameline = 0; 211 } 212 histmax = generate_histogram(fd, filesystem, 213 &histogram_ary, &mirror, 214 &repeat); 215 histindex = 0; 216 histogram = 1; 217 218 /* 219 * Just stream the histogram, then stop 220 */ 221 if (streaming == 0) 222 streaming = -1; 223 } 224 225 if (streaming && histogram) { 226 ++histindex; 227 mirror.tid_end = histogram_ary[histindex].tid; 228 estbytes = histogram_ary[histindex-1].bytes; 229 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end; 230 } else { 231 estbytes = 0; 232 } 233 234 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 235 &mrec_tmp, sizeof(mrec_tmp.pfs)); 236 237 /* 238 * A cycle file overrides the beginning TID only if we are 239 * not operating in two-way or histogram mode. 240 */ 241 if (TwoWayPipeOpt == 0 && histogram == 0) { 242 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg); 243 } 244 245 /* 246 * An additional argument overrides the beginning TID regardless 247 * of what mode we are in. This is not recommending if operating 248 * in two-way mode. 249 */ 250 if (ac == 2) 251 mirror.tid_beg = strtoull(av[1], NULL, 0); 252 253 if (streaming == 0 || VerboseOpt >= 2) { 254 fprintf(stderr, 255 "Mirror-read: Mirror %016jx to %016jx", 256 (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end); 257 if (histogram) 258 fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes); 259 fprintf(stderr, "\n"); 260 fflush(stderr); 261 } 262 if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) { 263 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n", 264 (uintmax_t)mirror.key_beg.obj_id); 265 } 266 267 /* 268 * Nothing to do if begin equals end. 269 */ 270 if (mirror.tid_beg >= mirror.tid_end) { 271 if (streaming == 0 || VerboseOpt >= 2) 272 fprintf(stderr, "Mirror-read: No work to do\n"); 273 sleep(DelayOpt); 274 didwork = 0; 275 histogram = 0; 276 goto done; 277 } 278 didwork = 1; 279 280 /* 281 * Write out bulk records 282 */ 283 mirror.ubuf = buf; 284 mirror.size = SERIALBUF_SIZE; 285 286 do { 287 mirror.count = 0; 288 mirror.pfs_id = pfs.pfs_id; 289 mirror.shared_uuid = pfs.ondisk->shared_uuid; 290 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 291 score_printf(LINE3, "Mirror-read %s failed: %s", 292 filesystem, strerror(errno)); 293 fprintf(stderr, "Mirror-read %s failed: %s\n", 294 filesystem, strerror(errno)); 295 exit(1); 296 } 297 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 298 score_printf(LINE3, "Mirror-read %s fatal error %d", 299 filesystem, mirror.head.error); 300 fprintf(stderr, 301 "Mirror-read %s fatal error %d\n", 302 filesystem, mirror.head.error); 303 exit(1); 304 } 305 if (mirror.count) { 306 if (BandwidthOpt) { 307 n = writebw(1, mirror.ubuf, mirror.count, 308 &bwcount, &bwtv); 309 } else { 310 n = write(1, mirror.ubuf, mirror.count); 311 } 312 if (n != mirror.count) { 313 score_printf(LINE3, 314 "Mirror-read %s failed: " 315 "short write", 316 filesystem); 317 fprintf(stderr, 318 "Mirror-read %s failed: " 319 "short write\n", 320 filesystem); 321 exit(1); 322 } 323 } 324 total_bytes += mirror.count; 325 if (streaming && VerboseOpt) { 326 fprintf(stderr, 327 "\rscan obj=%016jx tids=%016jx:%016jx %11jd", 328 (uintmax_t)mirror.key_cur.obj_id, 329 (uintmax_t)mirror.tid_beg, 330 (uintmax_t)mirror.tid_end, 331 (intmax_t)total_bytes); 332 fflush(stderr); 333 sameline = 0; 334 } else if (streaming) { 335 score_printf(LINE2, 336 "obj=%016jx tids=%016jx:%016jx %11jd", 337 (uintmax_t)mirror.key_cur.obj_id, 338 (uintmax_t)mirror.tid_beg, 339 (uintmax_t)mirror.tid_end, 340 (intmax_t)total_bytes); 341 } 342 mirror.key_beg = mirror.key_cur; 343 344 /* 345 * Deal with time limit option 346 */ 347 if (TimeoutOpt && 348 (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) { 349 score_printf(LINE3, 350 "Mirror-read %s interrupted by timer at" 351 " %016jx", 352 filesystem, 353 (uintmax_t)mirror.key_cur.obj_id); 354 fprintf(stderr, 355 "Mirror-read %s interrupted by timer at" 356 " %016jx\n", 357 filesystem, 358 (uintmax_t)mirror.key_cur.obj_id); 359 interrupted = 1; 360 break; 361 } 362 } while (mirror.count != 0); 363 364 done: 365 if (streaming && VerboseOpt && sameline == 0) { 366 fprintf(stderr, "\n"); 367 fflush(stderr); 368 sameline = 1; 369 } 370 371 /* 372 * Write out the termination sync record - only if not interrupted 373 */ 374 if (interrupted == 0) { 375 if (didwork) { 376 write_mrecord(1, HAMMER_MREC_TYPE_SYNC, 377 &mrec_tmp, sizeof(mrec_tmp.sync)); 378 } else { 379 write_mrecord(1, HAMMER_MREC_TYPE_IDLE, 380 &mrec_tmp, sizeof(mrec_tmp.sync)); 381 } 382 } 383 384 /* 385 * If the -2 option was given (automatic when doing mirror-copy), 386 * a two-way pipe is assumed and we expect a response mrec from 387 * the target. 388 */ 389 if (TwoWayPipeOpt) { 390 mrec = read_mrecord(0, &error, &pickup); 391 if (mrec == NULL || 392 mrec->head.type != HAMMER_MREC_TYPE_UPDATE || 393 mrec->head.rec_size != sizeof(mrec->update)) { 394 fprintf(stderr, "mirror_read: Did not get final " 395 "acknowledgement packet from target\n"); 396 exit(1); 397 } 398 if (interrupted) { 399 if (CyclePath) { 400 hammer_set_cycle(&mirror.key_cur, 401 mirror.tid_beg); 402 fprintf(stderr, "Cyclefile %s updated for " 403 "continuation\n", CyclePath); 404 } 405 } else { 406 sync_tid = mrec->update.tid; 407 if (CyclePath) { 408 hammer_key_beg_init(&mirror.key_beg); 409 hammer_set_cycle(&mirror.key_beg, sync_tid); 410 fprintf(stderr, 411 "Cyclefile %s updated to 0x%016jx\n", 412 CyclePath, (uintmax_t)sync_tid); 413 } 414 } 415 free(mrec); 416 } else if (CyclePath) { 417 /* NOTE! mirror.tid_beg cannot be updated */ 418 fprintf(stderr, "Warning: cycle file (-c option) cannot be " 419 "fully updated unless you use mirror-copy\n"); 420 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg); 421 } 422 if (streaming && interrupted == 0) { 423 time_t t1 = time(NULL); 424 time_t t2; 425 426 /* 427 * Try to break down large bulk transfers into smaller ones 428 * so it can sync the transaction id on the slave. This 429 * way if we get interrupted a restart doesn't have to 430 * start from scratch. 431 */ 432 if (streaming && histogram) { 433 if (histindex != histmax) { 434 if (VerboseOpt && VerboseOpt < 2 && 435 streaming >= 0) { 436 fprintf(stderr, " (bulk incremental)"); 437 } 438 relpfs(fd, &pfs); 439 goto again; 440 } 441 } 442 443 if (VerboseOpt && streaming >= 0) { 444 fprintf(stderr, " W"); 445 fflush(stderr); 446 } else if (streaming >= 0) { 447 score_printf(LINE1, "Waiting"); 448 } 449 pfs.ondisk->sync_end_tid = mirror.tid_end; 450 if (streaming < 0) { 451 /* 452 * Fake streaming mode when using a histogram to 453 * break up a mirror-read, do not wait on source. 454 */ 455 streaming = 0; 456 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) { 457 score_printf(LINE3, 458 "Mirror-read %s: cannot stream: %s\n", 459 filesystem, strerror(errno)); 460 fprintf(stderr, 461 "Mirror-read %s: cannot stream: %s\n", 462 filesystem, strerror(errno)); 463 } else { 464 t2 = time(NULL) - t1; 465 if (t2 >= 0 && t2 < DelayOpt) { 466 if (VerboseOpt) { 467 fprintf(stderr, "\bD"); 468 fflush(stderr); 469 } 470 sleep(DelayOpt - t2); 471 } 472 if (VerboseOpt) { 473 fprintf(stderr, "\b "); 474 fflush(stderr); 475 } 476 relpfs(fd, &pfs); 477 goto again; 478 } 479 } 480 write_mrecord(1, HAMMER_MREC_TYPE_TERM, 481 &mrec_tmp, sizeof(mrec_tmp.sync)); 482 relpfs(fd, &pfs); 483 free(buf); 484 free(histogram_ary); 485 fprintf(stderr, "Mirror-read %s succeeded\n", filesystem); 486 } 487 488 /* 489 * What we are trying to do here is figure out how much data is 490 * going to be sent for the TID range and to break the TID range 491 * down into reasonably-sized slices (from the point of view of 492 * data sent) so a lost connection can restart at a reasonable 493 * place and not all the way back at the beginning. 494 * 495 * An entry's TID serves as the end_tid for the prior entry 496 * So we have to offset the calculation by 1 so that TID falls into 497 * the previous entry when populating entries. 498 * 499 * Because the transaction id space is bursty we need a relatively 500 * large number of buckets (like a million) to do a reasonable job 501 * for things like an initial bulk mirrors on a very large filesystem. 502 */ 503 #define HIST_COUNT (1024 * 1024) 504 505 static int 506 generate_histogram(int fd, const char *filesystem, 507 histogram_t *histogram_ary, 508 struct hammer_ioc_mirror_rw *mirror_base, 509 int *repeatp) 510 { 511 struct hammer_ioc_mirror_rw mirror; 512 union hammer_ioc_mrecord_any *mrec; 513 hammer_tid_t tid_beg; 514 hammer_tid_t tid_end; 515 hammer_tid_t tid; 516 hammer_tid_t tidx; 517 uint64_t *tid_bytes; 518 uint64_t total; 519 uint64_t accum; 520 int chunkno; 521 int i; 522 int res; 523 int off; 524 int len; 525 526 mirror = *mirror_base; 527 tid_beg = mirror.tid_beg; 528 tid_end = mirror.tid_end; 529 mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA; 530 531 if (*histogram_ary == NULL) { 532 *histogram_ary = malloc(sizeof(struct histogram) * 533 (HIST_COUNT + 2)); 534 } 535 if (tid_beg >= tid_end) 536 return(0); 537 538 /* needs 2 extra */ 539 tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2)); 540 bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2)); 541 542 if (*repeatp == 0) { 543 fprintf(stderr, "Prescan to break up bulk transfer"); 544 if (VerboseOpt > 1) 545 fprintf(stderr, " (%juMB chunks)", 546 (uintmax_t)(SplitupOpt / (1024 * 1024))); 547 fprintf(stderr, "\n"); 548 } 549 550 /* 551 * Note: (tid_beg,tid_end), range is inclusive of both beg & end. 552 * 553 * Note: Estimates can be off when the mirror is way behind due 554 * to skips. 555 */ 556 total = 0; 557 accum = 0; 558 chunkno = 0; 559 for (;;) { 560 mirror.count = 0; 561 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) { 562 fprintf(stderr, "Mirror-read %s failed: %s\n", 563 filesystem, strerror(errno)); 564 exit(1); 565 } 566 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 567 fprintf(stderr, 568 "Mirror-read %s fatal error %d\n", 569 filesystem, mirror.head.error); 570 exit(1); 571 } 572 for (off = 0; 573 off < mirror.count; 574 off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) { 575 mrec = (void *)((char *)mirror.ubuf + off); 576 577 /* 578 * We only care about general RECs and PASS 579 * records. We ignore SKIPs. 580 */ 581 switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) { 582 case HAMMER_MREC_TYPE_REC: 583 case HAMMER_MREC_TYPE_PASS: 584 break; 585 default: 586 continue; 587 } 588 589 /* 590 * Calculate for two indices, create_tid and 591 * delete_tid. Record data only applies to 592 * the create_tid. 593 * 594 * When tid is exactly on the boundary it really 595 * belongs to the previous entry because scans 596 * are inclusive of the ending entry. 597 */ 598 tid = mrec->rec.leaf.base.delete_tid; 599 if (tid && tid >= tid_beg && tid <= tid_end) { 600 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 601 if (mrec->head.type == 602 HAMMER_MREC_TYPE_REC) { 603 len -= HAMMER_HEAD_DOALIGN( 604 mrec->rec.leaf.data_len); 605 assert(len > 0); 606 } 607 i = (tid - tid_beg) * HIST_COUNT / 608 (tid_end - tid_beg); 609 tidx = tid_beg + i * (tid_end - tid_beg) / 610 HIST_COUNT; 611 if (tid == tidx && i) 612 --i; 613 assert(i >= 0 && i < HIST_COUNT); 614 tid_bytes[i] += len; 615 total += len; 616 accum += len; 617 } 618 619 tid = mrec->rec.leaf.base.create_tid; 620 if (tid && tid >= tid_beg && tid <= tid_end) { 621 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 622 if (mrec->head.type == 623 HAMMER_MREC_TYPE_REC_NODATA) { 624 len += HAMMER_HEAD_DOALIGN( 625 mrec->rec.leaf.data_len); 626 } 627 i = (tid - tid_beg) * HIST_COUNT / 628 (tid_end - tid_beg); 629 tidx = tid_beg + i * (tid_end - tid_beg) / 630 HIST_COUNT; 631 if (tid == tidx && i) 632 --i; 633 assert(i >= 0 && i < HIST_COUNT); 634 tid_bytes[i] += len; 635 total += len; 636 accum += len; 637 } 638 } 639 if (*repeatp == 0 && accum > SplitupOpt) { 640 if (VerboseOpt > 1) { 641 fprintf(stderr, "."); 642 fflush(stderr); 643 } 644 ++chunkno; 645 score_printf(LINE2, "Prescan chunk %d", chunkno); 646 accum = 0; 647 } 648 if (mirror.count == 0) 649 break; 650 mirror.key_beg = mirror.key_cur; 651 } 652 653 /* 654 * Reduce to SplitupOpt (default 4GB) chunks. This code may 655 * use up to two additional elements. Do the array in-place. 656 * 657 * Inefficient degenerate cases can occur if we do not accumulate 658 * at least the requested split amount, so error on the side of 659 * going over a bit. 660 */ 661 res = 0; 662 (*histogram_ary)[res].tid = tid_beg; 663 (*histogram_ary)[res].bytes = tid_bytes[0]; 664 for (i = 1; i < HIST_COUNT; ++i) { 665 if ((*histogram_ary)[res].bytes >= SplitupOpt) { 666 ++res; 667 (*histogram_ary)[res].tid = tid_beg + 668 i * (tid_end - tid_beg) / 669 HIST_COUNT; 670 (*histogram_ary)[res].bytes = 0; 671 672 } 673 (*histogram_ary)[res].bytes += tid_bytes[i]; 674 } 675 ++res; 676 (*histogram_ary)[res].tid = tid_end; 677 (*histogram_ary)[res].bytes = -1; 678 679 if (*repeatp == 0) { 680 if (VerboseOpt > 1) 681 fprintf(stderr, "\n"); /* newline after ... */ 682 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes", 683 res, (uintmax_t)total / (1024 * 1024)); 684 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (", 685 res, (uintmax_t)total / (1024 * 1024)); 686 for (i = 0; i < res && i < 3; ++i) { 687 if (i) 688 fprintf(stderr, ", "); 689 fprintf(stderr, "%ju", 690 (uintmax_t)(*histogram_ary)[i].bytes); 691 } 692 if (i < res) 693 fprintf(stderr, ", ..."); 694 fprintf(stderr, ")\n"); 695 } 696 assert(res <= HIST_COUNT); 697 *repeatp = 1; 698 699 free(tid_bytes); 700 return(res); 701 } 702 703 static void 704 create_pfs(const char *filesystem, uuid_t *s_uuid) 705 { 706 if (ForceYesOpt == 1) { 707 fprintf(stderr, "PFS slave %s does not exist. " 708 "Auto create new slave PFS!\n", filesystem); 709 710 } else { 711 fprintf(stderr, "PFS slave %s does not exist.\n" 712 "Do you want to create a new slave PFS? (yes|no) ", 713 filesystem); 714 fflush(stderr); 715 if (getyntty() != 1) { 716 fprintf(stderr, "Aborting operation\n"); 717 exit(1); 718 } 719 } 720 721 uint32_t status; 722 char *shared_uuid = NULL; 723 uuid_to_string(s_uuid, &shared_uuid, &status); 724 725 char *cmd = NULL; 726 asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2", 727 filesystem, shared_uuid); 728 free(shared_uuid); 729 730 if (cmd == NULL) { 731 fprintf(stderr, "Failed to alloc memory\n"); 732 exit(1); 733 } 734 if (system(cmd) != 0) { 735 fprintf(stderr, "Failed to create PFS\n"); 736 } 737 free(cmd); 738 } 739 740 /* 741 * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding 742 * some additional packet types to negotiate TID ranges and to verify 743 * completion. The HAMMER VFS does most of the work. 744 * 745 * It is important to note that the mirror.key_{beg,end} range must 746 * match the ranged used by the original. For now both sides use 747 * range the entire key space. 748 * 749 * It is even more important that the records in the stream conform 750 * to the TID range also supplied in the stream. The HAMMER VFS will 751 * use the REC, PASS, and SKIP record types to track the portions of 752 * the B-Tree being scanned in order to be able to proactively delete 753 * records on the target within those active areas that are not mentioned 754 * by the source. 755 * 756 * The mirror.key_cur field is used by the VFS to do this tracking. It 757 * must be initialized to key_beg but then is persistently updated by 758 * the HAMMER VFS on each successive ioctl() call. If you blow up this 759 * field you will blow up the mirror target, possibly to the point of 760 * deleting everything. As a safety measure the HAMMER VFS simply marks 761 * the records that the source has destroyed as deleted on the target, 762 * and normal pruning operations will deal with their final disposition 763 * at some later time. 764 */ 765 void 766 hammer_cmd_mirror_write(char **av, int ac) 767 { 768 struct hammer_ioc_mirror_rw mirror; 769 char *filesystem; 770 char *buf = malloc(SERIALBUF_SIZE); 771 struct hammer_ioc_pseudofs_rw pfs; 772 struct hammer_ioc_mrecord_head pickup; 773 struct hammer_ioc_synctid synctid; 774 union hammer_ioc_mrecord_any mrec_tmp; 775 hammer_ioc_mrecord_any_t mrec; 776 struct stat st; 777 int error; 778 int fd; 779 int n; 780 781 if (ac != 1) 782 mirror_usage(1); 783 filesystem = av[0]; 784 hammer_check_restrict(filesystem); 785 786 pickup.signature = 0; 787 pickup.type = 0; 788 789 again: 790 bzero(&mirror, sizeof(mirror)); 791 hammer_key_beg_init(&mirror.key_beg); 792 hammer_key_end_init(&mirror.key_end); 793 mirror.key_end = mirror.key_beg; 794 795 /* 796 * Read initial packet 797 */ 798 mrec = read_mrecord(0, &error, &pickup); 799 if (mrec == NULL) { 800 if (error == 0) 801 fprintf(stderr, "validate_mrec_header: short read\n"); 802 exit(1); 803 } 804 /* 805 * Validate packet 806 */ 807 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 808 free(buf); 809 return; 810 } 811 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 812 fprintf(stderr, "validate_mrec_header: did not get expected " 813 "PFSD record type\n"); 814 exit(1); 815 } 816 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 817 fprintf(stderr, "validate_mrec_header: unexpected payload " 818 "size\n"); 819 exit(1); 820 } 821 822 /* 823 * Create slave PFS if it doesn't yet exist 824 */ 825 if (lstat(filesystem, &st) != 0) { 826 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid); 827 } 828 free(mrec); 829 mrec = NULL; 830 831 fd = getpfs(&pfs, filesystem); 832 833 /* 834 * In two-way mode the target writes out a PFS packet first. 835 * The source uses our tid_end as its tid_beg by default, 836 * picking up where it left off. 837 */ 838 mirror.tid_beg = 0; 839 if (TwoWayPipeOpt) { 840 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp); 841 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid) 842 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid; 843 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid; 844 write_mrecord(1, HAMMER_MREC_TYPE_PFSD, 845 &mrec_tmp, sizeof(mrec_tmp.pfs)); 846 } 847 848 /* 849 * Read and process the PFS header. The source informs us of 850 * the TID range the stream represents. 851 */ 852 n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup, 853 &mirror.tid_beg, &mirror.tid_end); 854 if (n < 0) { /* got TERM record */ 855 relpfs(fd, &pfs); 856 free(buf); 857 return; 858 } 859 860 mirror.ubuf = buf; 861 mirror.size = SERIALBUF_SIZE; 862 863 /* 864 * Read and process bulk records (REC, PASS, and SKIP types). 865 * 866 * On your life, do NOT mess with mirror.key_cur or your mirror 867 * target may become history. 868 */ 869 for (;;) { 870 mirror.count = 0; 871 mirror.pfs_id = pfs.pfs_id; 872 mirror.shared_uuid = pfs.ondisk->shared_uuid; 873 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 874 if (mirror.size <= 0) 875 break; 876 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) { 877 fprintf(stderr, "Mirror-write %s failed: %s\n", 878 filesystem, strerror(errno)); 879 exit(1); 880 } 881 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) { 882 fprintf(stderr, 883 "Mirror-write %s fatal error %d\n", 884 filesystem, mirror.head.error); 885 exit(1); 886 } 887 #if 0 888 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) { 889 fprintf(stderr, 890 "Mirror-write %s interrupted by timer at" 891 " %016llx\n", 892 filesystem, 893 mirror.key_cur.obj_id); 894 exit(0); 895 } 896 #endif 897 } 898 899 /* 900 * Read and process the termination sync record. 901 */ 902 mrec = read_mrecord(0, &error, &pickup); 903 904 if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) { 905 fprintf(stderr, "Mirror-write: received termination request\n"); 906 relpfs(fd, &pfs); 907 free(mrec); 908 free(buf); 909 return; 910 } 911 912 if (mrec == NULL || 913 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 914 mrec->head.type != HAMMER_MREC_TYPE_IDLE) || 915 mrec->head.rec_size != sizeof(mrec->sync)) { 916 fprintf(stderr, "Mirror-write %s: Did not get termination " 917 "sync record, or rec_size is wrong rt=%d\n", 918 filesystem, 919 (mrec ? (int)mrec->head.type : -1)); 920 exit(1); 921 } 922 923 /* 924 * Update the PFS info on the target so the user has visibility 925 * into the new snapshot, and sync the target filesystem. 926 */ 927 if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) { 928 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id); 929 930 bzero(&synctid, sizeof(synctid)); 931 synctid.op = HAMMER_SYNCTID_SYNC2; 932 ioctl(fd, HAMMERIOC_SYNCTID, &synctid); 933 934 if (VerboseOpt >= 2) { 935 fprintf(stderr, "Mirror-write %s: succeeded\n", 936 filesystem); 937 } 938 } 939 940 free(mrec); 941 mrec = NULL; 942 943 /* 944 * Report back to the originator. 945 */ 946 if (TwoWayPipeOpt) { 947 mrec_tmp.update.tid = mirror.tid_end; 948 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE, 949 &mrec_tmp, sizeof(mrec_tmp.update)); 950 } else { 951 printf("Source can update synctid to 0x%016jx\n", 952 (uintmax_t)mirror.tid_end); 953 } 954 relpfs(fd, &pfs); 955 goto again; 956 } 957 958 void 959 hammer_cmd_mirror_dump(char **av, int ac) 960 { 961 char *buf = malloc(SERIALBUF_SIZE); 962 struct hammer_ioc_mrecord_head pickup; 963 hammer_ioc_mrecord_any_t mrec; 964 int error; 965 int size; 966 int offset; 967 int bytes; 968 int header_only = 0; 969 970 if (ac == 1 && strcmp(*av, "header") == 0) 971 header_only = 1; 972 else if (ac != 0) 973 mirror_usage(1); 974 975 /* 976 * Read and process the PFS header 977 */ 978 pickup.signature = 0; 979 pickup.type = 0; 980 981 mrec = read_mrecord(0, &error, &pickup); 982 983 /* 984 * Dump the PFS header. mirror-dump takes its input from the output 985 * of a mirror-read so getpfs() can't be used to get a fd to be passed 986 * to dump_pfsd(). 987 */ 988 if (header_only && mrec != NULL) { 989 dump_pfsd(&mrec->pfs.pfsd, -1); 990 free(mrec); 991 free(buf); 992 return; 993 } 994 free(mrec); 995 996 again: 997 /* 998 * Read and process bulk records 999 */ 1000 for (;;) { 1001 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup); 1002 if (size <= 0) 1003 break; 1004 offset = 0; 1005 while (offset < size) { 1006 mrec = (void *)((char *)buf + offset); 1007 bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size); 1008 if (offset + bytes > size) { 1009 fprintf(stderr, "Misaligned record\n"); 1010 exit(1); 1011 } 1012 1013 switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) { 1014 case HAMMER_MREC_TYPE_REC_BADCRC: 1015 case HAMMER_MREC_TYPE_REC: 1016 printf("Record lo=%08x obj=%016jx key=%016jx " 1017 "rt=%02x ot=%02x", 1018 mrec->rec.leaf.base.localization, 1019 (uintmax_t)mrec->rec.leaf.base.obj_id, 1020 (uintmax_t)mrec->rec.leaf.base.key, 1021 mrec->rec.leaf.base.rec_type, 1022 mrec->rec.leaf.base.obj_type); 1023 if (mrec->head.type == 1024 HAMMER_MREC_TYPE_REC_BADCRC) { 1025 printf(" (BAD CRC)"); 1026 } 1027 printf("\n"); 1028 printf(" tids %016jx:%016jx data=%d\n", 1029 (uintmax_t)mrec->rec.leaf.base.create_tid, 1030 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1031 mrec->rec.leaf.data_len); 1032 break; 1033 case HAMMER_MREC_TYPE_PASS: 1034 printf("Pass lo=%08x obj=%016jx key=%016jx " 1035 "rt=%02x ot=%02x\n", 1036 mrec->rec.leaf.base.localization, 1037 (uintmax_t)mrec->rec.leaf.base.obj_id, 1038 (uintmax_t)mrec->rec.leaf.base.key, 1039 mrec->rec.leaf.base.rec_type, 1040 mrec->rec.leaf.base.obj_type); 1041 printf(" tids %016jx:%016jx data=%d\n", 1042 (uintmax_t)mrec->rec.leaf.base.create_tid, 1043 (uintmax_t)mrec->rec.leaf.base.delete_tid, 1044 mrec->rec.leaf.data_len); 1045 break; 1046 case HAMMER_MREC_TYPE_SKIP: 1047 printf("Skip lo=%08x obj=%016jx key=%016jx rt=%02x to\n" 1048 " lo=%08x obj=%016jx key=%016jx rt=%02x\n", 1049 mrec->skip.skip_beg.localization, 1050 (uintmax_t)mrec->skip.skip_beg.obj_id, 1051 (uintmax_t)mrec->skip.skip_beg.key, 1052 mrec->skip.skip_beg.rec_type, 1053 mrec->skip.skip_end.localization, 1054 (uintmax_t)mrec->skip.skip_end.obj_id, 1055 (uintmax_t)mrec->skip.skip_end.key, 1056 mrec->skip.skip_end.rec_type); 1057 default: 1058 break; 1059 } 1060 offset += bytes; 1061 } 1062 } 1063 1064 /* 1065 * Read and process the termination sync record. 1066 */ 1067 mrec = read_mrecord(0, &error, &pickup); 1068 if (mrec == NULL || 1069 (mrec->head.type != HAMMER_MREC_TYPE_SYNC && 1070 mrec->head.type != HAMMER_MREC_TYPE_IDLE)) { 1071 fprintf(stderr, "Mirror-dump: Did not get termination " 1072 "sync record\n"); 1073 } 1074 free(mrec); 1075 1076 /* 1077 * Continue with more batches until EOF. 1078 */ 1079 mrec = read_mrecord(0, &error, &pickup); 1080 if (mrec) { 1081 free(mrec); 1082 goto again; 1083 } 1084 free(buf); 1085 } 1086 1087 void 1088 hammer_cmd_mirror_copy(char **av, int ac, int streaming) 1089 { 1090 pid_t pid1; 1091 pid_t pid2; 1092 int fds[2]; 1093 const char *xav[32]; 1094 char tbuf[16]; 1095 char *sh, *user, *host, *rfs; 1096 int xac; 1097 1098 if (ac != 2) 1099 mirror_usage(1); 1100 1101 TwoWayPipeOpt = 1; 1102 signal(SIGPIPE, SIG_IGN); 1103 1104 again: 1105 if (pipe(fds) < 0) { 1106 perror("pipe"); 1107 exit(1); 1108 } 1109 1110 /* 1111 * Source 1112 */ 1113 if ((pid1 = fork()) == 0) { 1114 signal(SIGPIPE, SIG_DFL); 1115 dup2(fds[0], 0); 1116 dup2(fds[0], 1); 1117 close(fds[0]); 1118 close(fds[1]); 1119 if ((rfs = strchr(av[0], ':')) != NULL) { 1120 xac = 0; 1121 1122 if((sh = getenv("HAMMER_RSH")) == NULL) 1123 xav[xac++] = "ssh"; 1124 else 1125 xav[xac++] = sh; 1126 1127 if (CompressOpt) 1128 xav[xac++] = "-C"; 1129 1130 if ((host = strchr(av[0], '@')) != NULL) { 1131 user = strndup( av[0], (host++ - av[0])); 1132 host = strndup( host, (rfs++ - host)); 1133 xav[xac++] = "-l"; 1134 xav[xac++] = user; 1135 xav[xac++] = host; 1136 } 1137 else { 1138 host = strndup( av[0], (rfs++ - av[0])); 1139 user = NULL; 1140 xav[xac++] = host; 1141 } 1142 1143 1144 if (SshPort) { 1145 xav[xac++] = "-p"; 1146 xav[xac++] = SshPort; 1147 } 1148 1149 xav[xac++] = "hammer"; 1150 1151 switch(VerboseOpt) { 1152 case 0: 1153 break; 1154 case 1: 1155 xav[xac++] = "-v"; 1156 break; 1157 case 2: 1158 xav[xac++] = "-vv"; 1159 break; 1160 default: 1161 xav[xac++] = "-vvv"; 1162 break; 1163 } 1164 if (ForceYesOpt) { 1165 xav[xac++] = "-y"; 1166 } 1167 xav[xac++] = "-2"; 1168 if (TimeoutOpt) { 1169 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt); 1170 xav[xac++] = "-t"; 1171 xav[xac++] = tbuf; 1172 } 1173 if (SplitupOptStr) { 1174 xav[xac++] = "-S"; 1175 xav[xac++] = SplitupOptStr; 1176 } 1177 if (streaming) 1178 xav[xac++] = "mirror-read-stream"; 1179 else 1180 xav[xac++] = "mirror-read"; 1181 xav[xac++] = rfs; 1182 xav[xac++] = NULL; 1183 execvp(*xav, (void *)xav); 1184 } else { 1185 hammer_cmd_mirror_read(av, 1, streaming); 1186 fflush(stdout); 1187 fflush(stderr); 1188 } 1189 _exit(1); 1190 } 1191 1192 /* 1193 * Target 1194 */ 1195 if ((pid2 = fork()) == 0) { 1196 signal(SIGPIPE, SIG_DFL); 1197 dup2(fds[1], 0); 1198 dup2(fds[1], 1); 1199 close(fds[0]); 1200 close(fds[1]); 1201 if ((rfs = strchr(av[1], ':')) != NULL) { 1202 xac = 0; 1203 1204 if((sh = getenv("HAMMER_RSH")) == NULL) 1205 xav[xac++] = "ssh"; 1206 else 1207 xav[xac++] = sh; 1208 1209 if (CompressOpt) 1210 xav[xac++] = "-C"; 1211 1212 if ((host = strchr(av[1], '@')) != NULL) { 1213 user = strndup( av[1], (host++ - av[1])); 1214 host = strndup( host, (rfs++ - host)); 1215 xav[xac++] = "-l"; 1216 xav[xac++] = user; 1217 xav[xac++] = host; 1218 } 1219 else { 1220 host = strndup( av[1], (rfs++ - av[1])); 1221 user = NULL; 1222 xav[xac++] = host; 1223 } 1224 1225 if (SshPort) { 1226 xav[xac++] = "-p"; 1227 xav[xac++] = SshPort; 1228 } 1229 1230 xav[xac++] = "hammer"; 1231 1232 switch(VerboseOpt) { 1233 case 0: 1234 break; 1235 case 1: 1236 xav[xac++] = "-v"; 1237 break; 1238 case 2: 1239 xav[xac++] = "-vv"; 1240 break; 1241 default: 1242 xav[xac++] = "-vvv"; 1243 break; 1244 } 1245 if (ForceYesOpt) { 1246 xav[xac++] = "-y"; 1247 } 1248 xav[xac++] = "-2"; 1249 xav[xac++] = "mirror-write"; 1250 xav[xac++] = rfs; 1251 xav[xac++] = NULL; 1252 execvp(*xav, (void *)xav); 1253 } else { 1254 hammer_cmd_mirror_write(av + 1, 1); 1255 fflush(stdout); 1256 fflush(stderr); 1257 } 1258 _exit(1); 1259 } 1260 close(fds[0]); 1261 close(fds[1]); 1262 1263 while (waitpid(pid1, NULL, 0) <= 0) 1264 ; 1265 while (waitpid(pid2, NULL, 0) <= 0) 1266 ; 1267 1268 /* 1269 * If the link is lost restart 1270 */ 1271 if (streaming) { 1272 if (VerboseOpt) { 1273 fprintf(stderr, "\nLost Link\n"); 1274 fflush(stderr); 1275 } 1276 sleep(15 + DelayOpt); 1277 goto again; 1278 } 1279 1280 } 1281 1282 /* 1283 * Read and return multiple mrecords 1284 */ 1285 static int 1286 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup) 1287 { 1288 hammer_ioc_mrecord_any_t mrec; 1289 u_int count; 1290 size_t n; 1291 size_t i; 1292 size_t bytes; 1293 int type; 1294 1295 count = 0; 1296 while (size - count >= HAMMER_MREC_HEADSIZE) { 1297 /* 1298 * Cached the record header in case we run out of buffer 1299 * space. 1300 */ 1301 fflush(stdout); 1302 if (pickup->signature == 0) { 1303 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1304 i = read(fd, (char *)pickup + n, 1305 HAMMER_MREC_HEADSIZE - n); 1306 if (i <= 0) 1307 break; 1308 } 1309 if (n == 0) 1310 break; 1311 if (n != HAMMER_MREC_HEADSIZE) { 1312 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1313 exit(1); 1314 } 1315 if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1316 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1317 "bad signature\n"); 1318 exit(1); 1319 } 1320 } 1321 if (pickup->rec_size < HAMMER_MREC_HEADSIZE || 1322 pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) { 1323 fprintf(stderr, "read_mrecords: malformed record on pipe, " 1324 "illegal rec_size\n"); 1325 exit(1); 1326 } 1327 1328 /* 1329 * Stop if we have insufficient space for the record and data. 1330 */ 1331 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size); 1332 if (size - count < bytes) 1333 break; 1334 1335 /* 1336 * Stop if the record type is not a REC, SKIP, or PASS, 1337 * which are the only types the ioctl supports. Other types 1338 * are used only by the userland protocol. 1339 * 1340 * Ignore all flags. 1341 */ 1342 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK; 1343 if (type != HAMMER_MREC_TYPE_PFSD && 1344 type != HAMMER_MREC_TYPE_REC && 1345 type != HAMMER_MREC_TYPE_SKIP && 1346 type != HAMMER_MREC_TYPE_PASS) { 1347 break; 1348 } 1349 1350 /* 1351 * Read the remainder and clear the pickup signature. 1352 */ 1353 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) { 1354 i = read(fd, buf + count + n, bytes - n); 1355 if (i <= 0) 1356 break; 1357 } 1358 if (n != bytes) { 1359 fprintf(stderr, "read_mrecords: short read on pipe\n"); 1360 exit(1); 1361 } 1362 1363 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE); 1364 pickup->signature = 0; 1365 pickup->type = 0; 1366 mrec = (void *)(buf + count); 1367 1368 /* 1369 * Validate the completed record 1370 */ 1371 if (mrec->head.rec_crc != 1372 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1373 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1374 fprintf(stderr, "read_mrecords: malformed record " 1375 "on pipe, bad crc\n"); 1376 exit(1); 1377 } 1378 1379 /* 1380 * If its a B-Tree record validate the data crc. 1381 * 1382 * NOTE: If the VFS passes us an explicitly errorde mrec 1383 * we just pass it through. 1384 */ 1385 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK; 1386 1387 if (type == HAMMER_MREC_TYPE_REC) { 1388 if (mrec->head.rec_size < 1389 sizeof(mrec->rec) + mrec->rec.leaf.data_len) { 1390 fprintf(stderr, 1391 "read_mrecords: malformed record on " 1392 "pipe, illegal element data_len\n"); 1393 exit(1); 1394 } 1395 if (mrec->rec.leaf.data_len && 1396 mrec->rec.leaf.data_offset && 1397 hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) { 1398 fprintf(stderr, 1399 "read_mrecords: data_crc did not " 1400 "match data! obj=%016jx key=%016jx\n", 1401 (uintmax_t)mrec->rec.leaf.base.obj_id, 1402 (uintmax_t)mrec->rec.leaf.base.key); 1403 fprintf(stderr, 1404 "continuing, but there are problems\n"); 1405 } 1406 } 1407 count += bytes; 1408 } 1409 return(count); 1410 } 1411 1412 /* 1413 * Read and return a single mrecord. 1414 */ 1415 static 1416 hammer_ioc_mrecord_any_t 1417 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup) 1418 { 1419 hammer_ioc_mrecord_any_t mrec; 1420 struct hammer_ioc_mrecord_head mrechd; 1421 size_t bytes; 1422 size_t n; 1423 size_t i; 1424 1425 if (pickup && pickup->type != 0) { 1426 mrechd = *pickup; 1427 pickup->signature = 0; 1428 pickup->type = 0; 1429 n = HAMMER_MREC_HEADSIZE; 1430 } else { 1431 /* 1432 * Read in the PFSD header from the sender. 1433 */ 1434 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) { 1435 i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n); 1436 if (i <= 0) 1437 break; 1438 } 1439 if (n == 0) { 1440 *errorp = 0; /* EOF */ 1441 return(NULL); 1442 } 1443 if (n != HAMMER_MREC_HEADSIZE) { 1444 fprintf(stderr, "short read of mrecord header\n"); 1445 *errorp = EPIPE; 1446 return(NULL); 1447 } 1448 } 1449 if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) { 1450 fprintf(stderr, "read_mrecord: bad signature\n"); 1451 *errorp = EINVAL; 1452 return(NULL); 1453 } 1454 bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size); 1455 assert(bytes >= sizeof(mrechd)); 1456 mrec = malloc(bytes); 1457 mrec->head = mrechd; 1458 1459 while (n < bytes) { 1460 i = read(fdin, (char *)mrec + n, bytes - n); 1461 if (i <= 0) 1462 break; 1463 n += i; 1464 } 1465 if (n != bytes) { 1466 fprintf(stderr, "read_mrecord: short read on payload\n"); 1467 *errorp = EPIPE; 1468 return(NULL); 1469 } 1470 if (mrec->head.rec_crc != 1471 crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1472 mrec->head.rec_size - HAMMER_MREC_CRCOFF)) { 1473 fprintf(stderr, "read_mrecord: bad CRC\n"); 1474 *errorp = EINVAL; 1475 return(NULL); 1476 } 1477 *errorp = 0; 1478 return(mrec); 1479 } 1480 1481 static 1482 void 1483 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec, 1484 int bytes) 1485 { 1486 char zbuf[HAMMER_HEAD_ALIGN]; 1487 int pad; 1488 1489 pad = HAMMER_HEAD_DOALIGN(bytes) - bytes; 1490 1491 assert(bytes >= (int)sizeof(mrec->head)); 1492 bzero(&mrec->head, sizeof(mrec->head)); 1493 mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE; 1494 mrec->head.type = type; 1495 mrec->head.rec_size = bytes; 1496 mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF, 1497 bytes - HAMMER_MREC_CRCOFF); 1498 if (write(fdout, mrec, bytes) != bytes) { 1499 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1500 errno, strerror(errno)); 1501 exit(1); 1502 } 1503 if (pad) { 1504 bzero(zbuf, pad); 1505 if (write(fdout, zbuf, pad) != pad) { 1506 fprintf(stderr, "write_mrecord: error %d (%s)\n", 1507 errno, strerror(errno)); 1508 exit(1); 1509 } 1510 } 1511 } 1512 1513 /* 1514 * Generate a mirroring header with the pfs information of the 1515 * originating filesytem. 1516 */ 1517 static void 1518 generate_mrec_header(int fd, int pfs_id, 1519 union hammer_ioc_mrecord_any *mrec_tmp) 1520 { 1521 struct hammer_ioc_pseudofs_rw pfs; 1522 1523 bzero(&pfs, sizeof(pfs)); 1524 bzero(mrec_tmp, sizeof(*mrec_tmp)); 1525 pfs.pfs_id = pfs_id; 1526 pfs.ondisk = &mrec_tmp->pfs.pfsd; 1527 pfs.bytes = sizeof(mrec_tmp->pfs.pfsd); 1528 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1529 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n"); 1530 exit(1); 1531 } 1532 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1533 fprintf(stderr, "Mirror-read: HAMMER PFS version mismatch!\n"); 1534 exit(1); 1535 } 1536 mrec_tmp->pfs.version = pfs.version; 1537 } 1538 1539 /* 1540 * Validate the pfs information from the originating filesystem 1541 * against the target filesystem. shared_uuid must match. 1542 * 1543 * return -1 if we got a TERM record 1544 */ 1545 static int 1546 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id, 1547 struct hammer_ioc_mrecord_head *pickup, 1548 hammer_tid_t *tid_begp, hammer_tid_t *tid_endp) 1549 { 1550 struct hammer_ioc_pseudofs_rw pfs; 1551 struct hammer_pseudofs_data pfsd; 1552 hammer_ioc_mrecord_any_t mrec; 1553 int error; 1554 1555 /* 1556 * Get the PFSD info from the target filesystem. 1557 */ 1558 bzero(&pfs, sizeof(pfs)); 1559 bzero(&pfsd, sizeof(pfsd)); 1560 pfs.pfs_id = pfs_id; 1561 pfs.ondisk = &pfsd; 1562 pfs.bytes = sizeof(pfsd); 1563 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1564 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n"); 1565 exit(1); 1566 } 1567 if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) { 1568 fprintf(stderr, "mirror-write: HAMMER PFS version mismatch!\n"); 1569 exit(1); 1570 } 1571 1572 mrec = read_mrecord(fdin, &error, pickup); 1573 if (mrec == NULL) { 1574 if (error == 0) 1575 fprintf(stderr, "validate_mrec_header: short read\n"); 1576 exit(1); 1577 } 1578 if (mrec->head.type == HAMMER_MREC_TYPE_TERM) { 1579 free(mrec); 1580 return(-1); 1581 } 1582 1583 if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) { 1584 fprintf(stderr, "validate_mrec_header: did not get expected " 1585 "PFSD record type\n"); 1586 exit(1); 1587 } 1588 if (mrec->head.rec_size != sizeof(mrec->pfs)) { 1589 fprintf(stderr, "validate_mrec_header: unexpected payload " 1590 "size\n"); 1591 exit(1); 1592 } 1593 if (mrec->pfs.version != pfs.version) { 1594 fprintf(stderr, "validate_mrec_header: Version mismatch\n"); 1595 exit(1); 1596 } 1597 1598 /* 1599 * Whew. Ok, is the read PFS info compatible with the target? 1600 */ 1601 if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid, 1602 sizeof(pfsd.shared_uuid)) != 0) { 1603 fprintf(stderr, 1604 "mirror-write: source and target have " 1605 "different shared-uuid's!\n"); 1606 exit(1); 1607 } 1608 if (is_target && hammer_is_pfs_master(&pfsd)) { 1609 fprintf(stderr, "mirror-write: target must be in slave mode\n"); 1610 exit(1); 1611 } 1612 if (tid_begp) 1613 *tid_begp = mrec->pfs.pfsd.sync_beg_tid; 1614 if (tid_endp) 1615 *tid_endp = mrec->pfs.pfsd.sync_end_tid; 1616 free(mrec); 1617 return(0); 1618 } 1619 1620 static void 1621 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id) 1622 { 1623 struct hammer_ioc_pseudofs_rw pfs; 1624 struct hammer_pseudofs_data pfsd; 1625 1626 bzero(&pfs, sizeof(pfs)); 1627 bzero(&pfsd, sizeof(pfsd)); 1628 pfs.pfs_id = pfs_id; 1629 pfs.ondisk = &pfsd; 1630 pfs.bytes = sizeof(pfsd); 1631 if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) { 1632 perror("update_pfs_snapshot (read)"); 1633 exit(1); 1634 } 1635 if (pfsd.sync_end_tid != snapshot_tid) { 1636 pfsd.sync_end_tid = snapshot_tid; 1637 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) { 1638 perror("update_pfs_snapshot (rewrite)"); 1639 exit(1); 1640 } 1641 if (VerboseOpt >= 2) { 1642 fprintf(stderr, 1643 "Mirror-write: Completed, updated snapshot " 1644 "to %016jx\n", 1645 (uintmax_t)snapshot_tid); 1646 fflush(stderr); 1647 } 1648 } 1649 } 1650 1651 /* 1652 * Bandwidth-limited write in chunks 1653 */ 1654 static 1655 ssize_t 1656 writebw(int fd, const void *buf, size_t nbytes, 1657 uint64_t *bwcount, struct timeval *tv1) 1658 { 1659 struct timeval tv2; 1660 size_t n; 1661 ssize_t r; 1662 ssize_t a; 1663 int usec; 1664 1665 a = 0; 1666 r = 0; 1667 while (nbytes) { 1668 if (*bwcount + nbytes > BandwidthOpt) 1669 n = BandwidthOpt - *bwcount; 1670 else 1671 n = nbytes; 1672 if (n) 1673 r = write(fd, buf, n); 1674 if (r >= 0) { 1675 a += r; 1676 nbytes -= r; 1677 buf = (const char *)buf + r; 1678 } 1679 if ((size_t)r != n) 1680 break; 1681 *bwcount += n; 1682 if (*bwcount >= BandwidthOpt) { 1683 gettimeofday(&tv2, NULL); 1684 usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 + 1685 (int)(tv2.tv_usec - tv1->tv_usec); 1686 if (usec >= 0 && usec < 1000000) 1687 usleep(1000000 - usec); 1688 gettimeofday(tv1, NULL); 1689 *bwcount -= BandwidthOpt; 1690 } 1691 } 1692 return(a ? a : r); 1693 } 1694 1695 /* 1696 * Get a yes or no answer from the terminal. The program may be run as 1697 * part of a two-way pipe so we cannot use stdin for this operation. 1698 */ 1699 static int 1700 getyntty(void) 1701 { 1702 char buf[256]; 1703 FILE *fp; 1704 int result; 1705 1706 fp = fopen("/dev/tty", "r"); 1707 if (fp == NULL) { 1708 fprintf(stderr, "No terminal for response\n"); 1709 return(-1); 1710 } 1711 result = -1; 1712 while (fgets(buf, sizeof(buf), fp) != NULL) { 1713 if (buf[0] == 'y' || buf[0] == 'Y') { 1714 result = 1; 1715 break; 1716 } 1717 if (buf[0] == 'n' || buf[0] == 'N') { 1718 result = 0; 1719 break; 1720 } 1721 fprintf(stderr, "Response not understood\n"); 1722 break; 1723 } 1724 fclose(fp); 1725 return(result); 1726 } 1727 1728 static void 1729 score_printf(size_t i, size_t w, const char *ctl, ...) 1730 { 1731 va_list va; 1732 size_t n; 1733 static size_t SSize; 1734 static int SFd = -1; 1735 static char ScoreBuf[1024]; 1736 1737 if (ScoreBoardFile == NULL) 1738 return; 1739 assert(i + w < sizeof(ScoreBuf)); 1740 if (SFd < 0) { 1741 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644); 1742 if (SFd < 0) 1743 return; 1744 SSize = 0; 1745 } 1746 for (n = 0; n < i; ++n) { 1747 if (ScoreBuf[n] == 0) 1748 ScoreBuf[n] = ' '; 1749 } 1750 va_start(va, ctl); 1751 vsnprintf(ScoreBuf + i, w - 1, ctl, va); 1752 va_end(va); 1753 n = strlen(ScoreBuf + i); 1754 while (n < w - 1) { 1755 ScoreBuf[i + n] = ' '; 1756 ++n; 1757 } 1758 ScoreBuf[i + n] = '\n'; 1759 if (SSize < i + w) 1760 SSize = i + w; 1761 pwrite(SFd, ScoreBuf, SSize, 0); 1762 } 1763 1764 static void 1765 hammer_check_restrict(const char *filesystem) 1766 { 1767 size_t rlen; 1768 int atslash; 1769 1770 if (RestrictTarget == NULL) 1771 return; 1772 rlen = strlen(RestrictTarget); 1773 if (strncmp(filesystem, RestrictTarget, rlen) != 0) { 1774 fprintf(stderr, "hammer-remote: restricted target\n"); 1775 exit(1); 1776 } 1777 atslash = 1; 1778 while (filesystem[rlen]) { 1779 if (atslash && 1780 filesystem[rlen] == '.' && 1781 filesystem[rlen+1] == '.') { 1782 fprintf(stderr, "hammer-remote: '..' not allowed\n"); 1783 exit(1); 1784 } 1785 if (filesystem[rlen] == '/') 1786 atslash = 1; 1787 else 1788 atslash = 0; 1789 ++rlen; 1790 } 1791 } 1792 1793 static void 1794 mirror_usage(int code) 1795 { 1796 fprintf(stderr, 1797 "hammer mirror-read <filesystem> [begin-tid]\n" 1798 "hammer mirror-read-stream <filesystem> [begin-tid]\n" 1799 "hammer mirror-write <filesystem>\n" 1800 "hammer mirror-dump [header]\n" 1801 "hammer mirror-copy [[user@]host:]<filesystem>" 1802 " [[user@]host:]<filesystem>\n" 1803 "hammer mirror-stream [[user@]host:]<filesystem>" 1804 " [[user@]host:]<filesystem>\n" 1805 ); 1806 exit(code); 1807 } 1808 1809