1 /* $NetBSD: tape.c,v 1.57 2021/06/19 13:56:34 christos Exp $ */ 2 3 /*- 4 * Copyright (c) 1980, 1991, 1993 5 * The Regents of the University of California. All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. Neither the name of the University nor the names of its contributors 16 * may be used to endorse or promote products derived from this software 17 * without specific prior written permission. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND 20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE 23 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 24 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 25 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 26 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 27 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 28 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 29 * SUCH DAMAGE. 30 */ 31 32 #include <sys/cdefs.h> 33 #ifndef lint 34 #if 0 35 static char sccsid[] = "@(#)tape.c 8.4 (Berkeley) 5/1/95"; 36 #else 37 __RCSID("$NetBSD: tape.c,v 1.57 2021/06/19 13:56:34 christos Exp $"); 38 #endif 39 #endif /* not lint */ 40 41 #include <sys/param.h> 42 #include <sys/socket.h> 43 #include <sys/time.h> 44 #include <sys/wait.h> 45 #include <sys/ioctl.h> 46 #include <sys/mtio.h> 47 48 #include <errno.h> 49 #include <fcntl.h> 50 #include <signal.h> 51 #include <stdio.h> 52 #include <stdlib.h> 53 #include <string.h> 54 #include <time.h> 55 #include <unistd.h> 56 57 #include "dump.h" 58 #include "pathnames.h" 59 60 int writesize; /* size of malloc()ed buffer for tape */ 61 int64_t lastspclrec = -1; /* tape block number of last written header */ 62 int trecno = 0; /* next record to write in current block */ 63 extern long blocksperfile; /* number of blocks per output file */ 64 long blocksthisvol; /* number of blocks on current output file */ 65 extern int ntrec; /* blocking factor on tape */ 66 extern int cartridge; 67 extern const char *host; 68 char *nexttape; 69 70 static ssize_t atomic_read(int, void *, int); 71 static ssize_t atomic_write(int, const void *, int); 72 static void doworker(int, int); 73 static void create_workers(void); 74 static void flushtape(void); 75 static void killall(void); 76 static void proceed(int); 77 static void rollforward(void); 78 static void sigpipe(int); 79 static void tperror(int); 80 81 /* 82 * Concurrent dump mods (Caltech) - disk block reading and tape writing 83 * are exported to several worker processes. While one worker writes the 84 * tape, the others read disk blocks; they pass control of the tape in 85 * a ring via signals. The parent process traverses the file system and 86 * sends writeheader()'s and lists of daddr's to the workers via pipes. 87 * The following structure defines the instruction packets sent to workers. 88 */ 89 struct req { 90 daddr_t dblk; 91 int count; 92 }; 93 int reqsiz; 94 95 #define WORKERS 3 /* 1 worker writing, 1 reading, 1 for slack */ 96 struct worker { 97 int64_t tapea; /* header number at start of this chunk */ 98 int64_t firstrec; /* record number of this block */ 99 int count; /* count to next header (used for TS_TAPE */ 100 /* after EOT) */ 101 int inode; /* inode that we are currently dealing with */ 102 int fd; /* FD for this worker */ 103 int pid; /* PID for this worker */ 104 int sent; /* 1 == we've sent this worker requests */ 105 char (*tblock)[TP_BSIZE]; /* buffer for data blocks */ 106 struct req *req; /* buffer for requests */ 107 } workers[WORKERS+1]; 108 struct worker *wp; 109 110 char (*nextblock)[TP_BSIZE]; 111 112 static int64_t tapea_volume; /* value of spcl.c_tapea at volume start */ 113 114 int master; /* pid of master, for sending error signals */ 115 int tenths; /* length of tape used per block written */ 116 static volatile sig_atomic_t caught; /* have we caught the signal to proceed? */ 117 118 int 119 alloctape(void) 120 { 121 int pgoff = getpagesize() - 1; 122 char *buf; 123 int i; 124 125 writesize = ntrec * TP_BSIZE; 126 reqsiz = (ntrec + 1) * sizeof(struct req); 127 /* 128 * CDC 92181's and 92185's make 0.8" gaps in 1600-bpi start/stop mode 129 * (see DEC TU80 User's Guide). The shorter gaps of 6250-bpi require 130 * repositioning after stopping, i.e, streaming mode, where the gap is 131 * variable, 0.30" to 0.45". The gap is maximal when the tape stops. 132 */ 133 if (blocksperfile == 0 && !unlimited) 134 tenths = writesize / density + 135 (cartridge ? 16 : density == 625 ? 5 : 8); 136 /* 137 * Allocate tape buffer contiguous with the array of instruction 138 * packets, so flushtape() can write them together with one write(). 139 * Align tape buffer on page boundary to speed up tape write(). 140 */ 141 for (i = 0; i <= WORKERS; i++) { 142 buf = (char *) 143 xmalloc((unsigned)(reqsiz + writesize + pgoff + TP_BSIZE)); 144 workers[i].tblock = (char (*)[TP_BSIZE]) 145 (((long)&buf[ntrec + 1] + pgoff) &~ pgoff); 146 workers[i].req = (struct req *)workers[i].tblock - ntrec - 1; 147 } 148 wp = &workers[0]; 149 wp->count = 1; 150 wp->tapea = 0; 151 wp->firstrec = 0; 152 nextblock = wp->tblock; 153 return(1); 154 } 155 156 void 157 writerec(const char *dp, int isspcl) 158 { 159 160 wp->req[trecno].dblk = (daddr_t)0; 161 wp->req[trecno].count = 1; 162 *(union u_spcl *)(*(nextblock)++) = *(const union u_spcl *)dp; 163 if (isspcl) 164 lastspclrec = iswap64(spcl.c_tapea); 165 trecno++; 166 spcl.c_tapea = iswap64(iswap64(spcl.c_tapea) +1); 167 if (trecno >= ntrec) 168 flushtape(); 169 } 170 171 void 172 dumpblock(daddr_t blkno, int size) 173 { 174 int avail, tpblks; 175 daddr_t dblkno; 176 177 dblkno = fsatoda(ufsib, blkno); 178 tpblks = size >> tp_bshift; 179 while ((avail = MIN(tpblks, ntrec - trecno)) > 0) { 180 wp->req[trecno].dblk = dblkno; 181 wp->req[trecno].count = avail; 182 trecno += avail; 183 spcl.c_tapea = iswap64(iswap64(spcl.c_tapea) + avail); 184 if (trecno >= ntrec) 185 flushtape(); 186 dblkno += avail << (tp_bshift - dev_bshift); 187 tpblks -= avail; 188 } 189 } 190 191 int nogripe = 0; 192 193 static void 194 tperror(int signo __unused) 195 { 196 197 if (pipeout) { 198 msg("write error on %s\n", tape); 199 quit("Cannot recover"); 200 /* NOTREACHED */ 201 } 202 msg("write error %ld blocks into volume %d\n", blocksthisvol, tapeno); 203 broadcast("DUMP WRITE ERROR!\n"); 204 if (!query("Do you want to restart?")) 205 dumpabort(0); 206 msg("Closing this volume. Prepare to restart with new media;\n"); 207 msg("this dump volume will be rewritten.\n"); 208 killall(); 209 nogripe = 1; 210 close_rewind(); 211 Exit(X_REWRITE); 212 } 213 214 static void 215 sigpipe(int signo __unused) 216 { 217 218 quit("Broken pipe"); 219 } 220 221 /* 222 * do_stats -- 223 * Update xferrate stats 224 */ 225 time_t 226 do_stats(void) 227 { 228 time_t tnow, ttaken; 229 int64_t blocks; 230 231 (void)time(&tnow); 232 ttaken = tnow - tstart_volume; 233 blocks = iswap64(spcl.c_tapea) - tapea_volume; 234 msg("Volume %d completed at: %s", tapeno, ctime(&tnow)); 235 if (ttaken > 0) { 236 msg("Volume %d took %d:%02d:%02d\n", tapeno, 237 (int) (ttaken / 3600), (int) ((ttaken % 3600) / 60), 238 (int) (ttaken % 60)); 239 msg("Volume %d transfer rate: %d KB/s\n", tapeno, 240 (int) (blocks / ttaken)); 241 xferrate += blocks / ttaken; 242 } 243 return(tnow); 244 } 245 246 /* 247 * statussig -- 248 * information message upon receipt of SIGINFO 249 * (derived from optr.c::timeest()) 250 */ 251 void 252 statussig(int notused __unused) 253 { 254 time_t tnow, deltat; 255 char msgbuf[128]; 256 int errno_save; 257 258 if (blockswritten < 500) 259 return; 260 errno_save = errno; 261 (void) time((time_t *) &tnow); 262 if (tnow <= tstart_volume) 263 return; 264 deltat = tstart_writing - tnow + 265 (1.0 * (tnow - tstart_writing)) / blockswritten * tapesize; 266 (void)snprintf(msgbuf, sizeof(msgbuf), 267 "%3.2f%% done at %ld KB/s, finished in %d:%02d\n", 268 (blockswritten * 100.0) / tapesize, 269 (long)((iswap64(spcl.c_tapea) - tapea_volume) / 270 (tnow - tstart_volume)), 271 (int)(deltat / 3600), (int)((deltat % 3600) / 60)); 272 write(STDERR_FILENO, msgbuf, strlen(msgbuf)); 273 errno = errno_save; 274 } 275 276 static void 277 flushtape(void) 278 { 279 int i, blks, got; 280 int64_t lastfirstrec; 281 282 int siz = (char *)nextblock - (char *)wp->req; 283 284 wp->req[trecno].count = 0; /* Sentinel */ 285 286 if (atomic_write(wp->fd, wp->req, siz) != siz) 287 quite(errno, "error writing command pipe"); 288 wp->sent = 1; /* we sent a request, read the response later */ 289 290 lastfirstrec = wp->firstrec; 291 292 if (++wp >= &workers[WORKERS]) 293 wp = &workers[0]; 294 295 /* Read results back from next worker */ 296 if (wp->sent) { 297 if (atomic_read(wp->fd, &got, sizeof got) 298 != sizeof got) { 299 perror(" DUMP: error reading command pipe in master"); 300 dumpabort(0); 301 } 302 wp->sent = 0; 303 304 /* Check for end of tape */ 305 if (got < writesize) { 306 msg("End of tape detected\n"); 307 308 /* 309 * Drain the results, don't care what the values were. 310 * If we read them here then trewind won't... 311 */ 312 for (i = 0; i < WORKERS; i++) { 313 if (workers[i].sent) { 314 if (atomic_read(workers[i].fd, 315 &got, sizeof got) 316 != sizeof got) { 317 perror(" DUMP: error reading command pipe in master"); 318 dumpabort(0); 319 } 320 workers[i].sent = 0; 321 } 322 } 323 324 close_rewind(); 325 rollforward(); 326 return; 327 } 328 } 329 330 blks = 0; 331 if (iswap32(spcl.c_type) == TS_INODE || 332 iswap32(spcl.c_type) == TS_ADDR) { 333 for (i = 0; i < iswap32(spcl.c_count); i++) 334 if (spcl.c_addr[i] != 0) 335 blks++; 336 } 337 wp->count = lastspclrec + blks + 1 - iswap64(spcl.c_tapea); 338 wp->tapea = iswap64(spcl.c_tapea); 339 wp->firstrec = lastfirstrec + ntrec; 340 wp->inode = curino; 341 nextblock = wp->tblock; 342 trecno = 0; 343 asize += tenths; 344 blockswritten += ntrec; 345 blocksthisvol += ntrec; 346 if (!pipeout && !unlimited && (blocksperfile ? 347 (blocksthisvol >= blocksperfile) : (asize > tsize))) { 348 close_rewind(); 349 startnewtape(0); 350 } 351 timeest(); 352 } 353 354 void 355 trewind(int eject) 356 { 357 int f; 358 int got; 359 360 for (f = 0; f < WORKERS; f++) { 361 /* 362 * Drain the results, but unlike EOT we DO (or should) care 363 * what the return values were, since if we detect EOT after 364 * we think we've written the last blocks to the tape anyway, 365 * we have to replay those blocks with rollforward. 366 * 367 * fixme: punt for now. 368 */ 369 if (workers[f].sent) { 370 if (atomic_read(workers[f].fd, &got, sizeof got) 371 != sizeof got) { 372 perror(" DUMP: error reading command pipe in master"); 373 dumpabort(0); 374 } 375 workers[f].sent = 0; 376 if (got != writesize) { 377 msg("EOT detected in last 2 tape records!\n"); 378 msg("Use a longer tape, decrease the size estimate\n"); 379 quit("or use no size estimate at all"); 380 } 381 } 382 (void) close(workers[f].fd); 383 } 384 while (wait(NULL) >= 0) /* wait for any signals from workers */ 385 /* void */; 386 387 if (pipeout) 388 return; 389 390 msg("Closing %s\n", tape); 391 392 #ifdef RDUMP 393 if (host) { 394 rmtclose(); 395 while (rmtopen(tape, 0, 0) < 0) 396 sleep(10); 397 if (eflag && eject) { 398 msg("Ejecting %s\n", tape); 399 (void) rmtioctl(MTOFFL, 0); 400 } 401 rmtclose(); 402 return; 403 } 404 #endif 405 (void) close(tapefd); 406 while ((f = open(tape, 0)) < 0) 407 sleep (10); 408 if (eflag && eject) { 409 struct mtop offl; 410 411 msg("Ejecting %s\n", tape); 412 offl.mt_op = MTOFFL; 413 offl.mt_count = 0; 414 (void) ioctl(f, MTIOCTOP, &offl); 415 } 416 (void) close(f); 417 } 418 419 void 420 close_rewind(void) 421 { 422 int i, f; 423 424 trewind(1); 425 (void)do_stats(); 426 if (nexttape) 427 return; 428 if (!nogripe) { 429 msg("Change Volumes: Mount volume #%d\n", tapeno+1); 430 broadcast("CHANGE DUMP VOLUMES!\a\a\n"); 431 } 432 if (lflag) { 433 for (i = 0; i < lflag / 10; i++) { /* wait lflag seconds */ 434 if (host) { 435 if (rmtopen(tape, 0, 0) >= 0) { 436 rmtclose(); 437 return; 438 } 439 } else { 440 if ((f = open(tape, 0)) >= 0) { 441 close(f); 442 return; 443 } 444 } 445 sleep (10); 446 } 447 } 448 449 while (!query("Is the new volume mounted and ready to go?")) 450 if (query("Do you want to abort?")) { 451 dumpabort(0); 452 /*NOTREACHED*/ 453 } 454 } 455 456 void 457 rollforward(void) 458 { 459 struct req *p, *q, *prev; 460 struct worker *twp; 461 int i, size, got; 462 int64_t savedtapea; 463 union u_spcl *ntb, *otb; 464 twp = &workers[WORKERS]; 465 ntb = (union u_spcl *)twp->tblock[1]; 466 467 /* 468 * Each of the N workers should have requests that need to 469 * be replayed on the next tape. Use the extra worker buffers 470 * (workers[WORKERS]) to construct request lists to be sent to 471 * each worker in turn. 472 */ 473 for (i = 0; i < WORKERS; i++) { 474 q = &twp->req[1]; 475 otb = (union u_spcl *)wp->tblock; 476 477 /* 478 * For each request in the current worker, copy it to twp. 479 */ 480 481 prev = NULL; 482 for (p = wp->req; p->count > 0; p += p->count) { 483 *q = *p; 484 if (p->dblk == 0) 485 *ntb++ = *otb++; /* copy the datablock also */ 486 prev = q; 487 q += q->count; 488 } 489 if (prev == NULL) 490 quit("rollforward: protocol botch"); 491 if (prev->dblk != 0) 492 prev->count -= 1; 493 else 494 ntb--; 495 q -= 1; 496 q->count = 0; 497 q = &twp->req[0]; 498 if (i == 0) { 499 q->dblk = 0; 500 q->count = 1; 501 trecno = 0; 502 nextblock = twp->tblock; 503 savedtapea = iswap64(spcl.c_tapea); 504 spcl.c_tapea = iswap64(wp->tapea); 505 startnewtape(0); 506 spcl.c_tapea = iswap64(savedtapea); 507 lastspclrec = savedtapea - 1; 508 } 509 size = (char *)ntb - (char *)q; 510 if (atomic_write(wp->fd, q, size) != size) { 511 perror(" DUMP: error writing command pipe"); 512 dumpabort(0); 513 } 514 wp->sent = 1; 515 if (++wp >= &workers[WORKERS]) 516 wp = &workers[0]; 517 518 q->count = 1; 519 520 if (prev->dblk != 0) { 521 /* 522 * If the last one was a disk block, make the 523 * first of this one be the last bit of that disk 524 * block... 525 */ 526 q->dblk = prev->dblk + 527 prev->count * (TP_BSIZE / DEV_BSIZE); 528 ntb = (union u_spcl *)twp->tblock; 529 } else { 530 /* 531 * It wasn't a disk block. Copy the data to its 532 * new location in the buffer. 533 */ 534 q->dblk = 0; 535 *((union u_spcl *)twp->tblock) = *ntb; 536 ntb = (union u_spcl *)twp->tblock[1]; 537 } 538 } 539 wp->req[0] = *q; 540 nextblock = wp->tblock; 541 if (q->dblk == 0) 542 nextblock++; 543 trecno = 1; 544 545 /* 546 * Clear the first workers' response. One hopes that it 547 * worked ok, otherwise the tape is much too short! 548 */ 549 if (wp->sent) { 550 if (atomic_read(wp->fd, &got, sizeof got) 551 != sizeof got) { 552 perror(" DUMP: error reading command pipe in master"); 553 dumpabort(0); 554 } 555 wp->sent = 0; 556 557 if (got != writesize) { 558 quit("EOT detected at start of the tape"); 559 } 560 } 561 } 562 563 /* 564 * We implement taking and restoring checkpoints on the tape level. 565 * When each tape is opened, a new process is created by forking; this 566 * saves all of the necessary context in the parent. The child 567 * continues the dump; the parent waits around, saving the context. 568 * If the child returns X_REWRITE, then it had problems writing that tape; 569 * this causes the parent to fork again, duplicating the context, and 570 * everything continues as if nothing had happened. 571 */ 572 void 573 startnewtape(int top) 574 { 575 int parentpid; 576 int childpid; 577 int status; 578 int waitforpid; 579 char *p; 580 sig_t interrupt_save; 581 582 interrupt_save = signal(SIGINT, SIG_IGN); 583 parentpid = getpid(); 584 tapea_volume = iswap64(spcl.c_tapea); 585 (void)time(&tstart_volume); 586 587 restore_check_point: 588 (void)signal(SIGINT, interrupt_save); 589 /* 590 * All signals are inherited... 591 */ 592 childpid = fork(); 593 if (childpid < 0) { 594 msg("Context save fork fails in parent %d\n", parentpid); 595 Exit(X_ABORT); 596 } 597 if (childpid != 0) { 598 /* 599 * PARENT: 600 * save the context by waiting 601 * until the child doing all of the work returns. 602 * don't catch the interrupt 603 */ 604 signal(SIGINT, SIG_IGN); 605 signal(SIGINFO, SIG_IGN); /* only want child's stats */ 606 #ifdef TDEBUG 607 msg("Tape: %d; parent process: %d child process %d\n", 608 tapeno+1, parentpid, childpid); 609 #endif /* TDEBUG */ 610 while ((waitforpid = wait(&status)) != childpid) 611 msg("Parent %d waiting for child %d has another child %d return\n", 612 parentpid, childpid, waitforpid); 613 if (status & 0xFF) { 614 msg("Child %d returns LOB status %o\n", 615 childpid, status&0xFF); 616 } 617 status = (status >> 8) & 0xFF; 618 #ifdef TDEBUG 619 switch(status) { 620 case X_FINOK: 621 msg("Child %d finishes X_FINOK\n", childpid); 622 break; 623 case X_ABORT: 624 msg("Child %d finishes X_ABORT\n", childpid); 625 break; 626 case X_REWRITE: 627 msg("Child %d finishes X_REWRITE\n", childpid); 628 break; 629 default: 630 msg("Child %d finishes unknown %d\n", 631 childpid, status); 632 break; 633 } 634 #endif /* TDEBUG */ 635 switch(status) { 636 case X_FINOK: 637 Exit(X_FINOK); 638 case X_ABORT: 639 Exit(X_ABORT); 640 case X_REWRITE: 641 goto restore_check_point; 642 default: 643 msg("Bad return code from dump: %d\n", status); 644 Exit(X_ABORT); 645 } 646 /*NOTREACHED*/ 647 } else { /* we are the child; just continue */ 648 signal(SIGINFO, statussig); /* now want child's stats */ 649 #ifdef TDEBUG 650 sleep(4); /* allow time for parent's message to get out */ 651 msg("Child on Tape %d has parent %d, my pid = %d\n", 652 tapeno+1, parentpid, getpid()); 653 #endif /* TDEBUG */ 654 /* 655 * If we have a name like "/dev/rst0,/dev/rst1", 656 * use the name before the comma first, and save 657 * the remaining names for subsequent volumes. 658 */ 659 tapeno++; /* current tape sequence */ 660 if (nexttape || strchr(tape, ',')) { 661 if (nexttape && *nexttape) 662 tape = nexttape; 663 if ((p = strchr(tape, ',')) != NULL) { 664 *p = '\0'; 665 nexttape = p + 1; 666 } else 667 nexttape = NULL; 668 msg("Dumping volume %d on %s\n", tapeno, tape); 669 } 670 #ifdef RDUMP 671 while ((tapefd = (host ? rmtopen(tape, 2, 1) : 672 pipeout ? 1 : open(tape, O_WRONLY|O_CREAT, 0666))) < 0) 673 #else 674 while ((tapefd = (pipeout ? 1 : 675 open(tape, O_WRONLY|O_CREAT, 0666))) < 0) 676 #endif 677 { 678 msg("Cannot open output \"%s\".\n", tape); 679 if (!query("Do you want to retry the open?")) 680 dumpabort(0); 681 } 682 683 create_workers(); /* Share open tape file descriptor with workers */ 684 685 asize = 0; 686 blocksthisvol = 0; 687 if (top) 688 newtape++; /* new tape signal */ 689 spcl.c_count = iswap32(wp->count); 690 /* 691 * measure firstrec in TP_BSIZE units since restore doesn't 692 * know the correct ntrec value... 693 */ 694 spcl.c_firstrec = iswap32(wp->firstrec); 695 spcl.c_volume = iswap32(iswap32(spcl.c_volume) + 1); 696 spcl.c_type = iswap32(TS_TAPE); 697 if (!is_ufs2) 698 spcl.c_flags = iswap32(iswap32(spcl.c_flags) 699 | DR_NEWHEADER); 700 writeheader((ino_t)wp->inode); 701 if (!is_ufs2) 702 spcl.c_flags = iswap32(iswap32(spcl.c_flags) & 703 ~ DR_NEWHEADER); 704 msg("Volume %d started at: %s", tapeno, ctime(&tstart_volume)); 705 if (tapeno > 1) 706 msg("Volume %d begins with blocks from inode %d\n", 707 tapeno, wp->inode); 708 } 709 } 710 711 void 712 dumpabort(int signo __unused) 713 { 714 715 if (master != 0 && master != getpid()) 716 /* Signals master to call dumpabort */ 717 (void) kill(master, SIGTERM); 718 else { 719 #ifdef DUMP_LFS 720 lfs_wrap_go(); 721 #endif 722 killall(); 723 msg("The ENTIRE dump is aborted.\n"); 724 } 725 #ifdef RDUMP 726 rmtclose(); 727 #endif 728 Exit(X_ABORT); 729 } 730 731 void 732 Exit(int status) 733 { 734 735 #ifdef TDEBUG 736 msg("pid = %d exits with status %d\n", getpid(), status); 737 #endif /* TDEBUG */ 738 exit(status); 739 } 740 741 /* 742 * proceed - handler for SIGUSR2, used to synchronize IO between the workers. 743 */ 744 static void 745 proceed(int signo __unused) 746 { 747 caught++; 748 } 749 750 void 751 create_workers(void) 752 { 753 int cmd[2]; 754 int i, j; 755 756 master = getpid(); 757 758 signal(SIGTERM, dumpabort); /* Slave sends SIGTERM on dumpabort() */ 759 signal(SIGPIPE, sigpipe); 760 signal(SIGUSR1, tperror); /* Slave sends SIGUSR1 on tape errors */ 761 signal(SIGUSR2, proceed); /* Slave sends SIGUSR2 to next worker */ 762 763 for (i = 0; i < WORKERS; i++) { 764 if (i == wp - &workers[0]) { 765 caught = 1; 766 } else { 767 caught = 0; 768 } 769 770 if (socketpair(AF_LOCAL, SOCK_STREAM, 0, cmd) < 0 || 771 (workers[i].pid = fork()) < 0) 772 quite(errno, "too many workers, %d (recompile smaller)", 773 i); 774 775 workers[i].fd = cmd[1]; 776 workers[i].sent = 0; 777 if (workers[i].pid == 0) { /* Slave starts up here */ 778 for (j = 0; j <= i; j++) 779 (void) close(workers[j].fd); 780 signal(SIGINT, SIG_IGN); /* Master handles this */ 781 signal(SIGINFO, SIG_IGN); 782 doworker(cmd[0], i); 783 Exit(X_FINOK); 784 } 785 } 786 787 for (i = 0; i < WORKERS; i++) 788 (void) atomic_write(workers[i].fd, 789 &workers[(i + 1) % WORKERS].pid, 790 sizeof workers[0].pid); 791 792 master = 0; 793 } 794 795 void 796 killall(void) 797 { 798 int i; 799 800 for (i = 0; i < WORKERS; i++) 801 if (workers[i].pid > 0) { 802 (void) kill(workers[i].pid, SIGKILL); 803 workers[i].sent = 0; 804 } 805 } 806 807 /* 808 * Synchronization - each process has a lockfile, and shares file 809 * descriptors to the following process's lockfile. When our write 810 * completes, we release our lock on the following process's lock- 811 * file, allowing the following process to lock it and proceed. We 812 * get the lock back for the next cycle by swapping descriptors. 813 */ 814 static void 815 doworker(int cmd, int worker_number __unused) 816 { 817 int nread, nextworker, size, wrote, eot_count, werror; 818 sigset_t nsigset, osigset; 819 820 wrote = 0; 821 /* 822 * Need our own seek pointer. 823 */ 824 (void) close(diskfd); 825 if ((diskfd = open(disk_dev, O_RDONLY)) < 0) 826 quite(errno, "worker couldn't reopen disk"); 827 828 /* 829 * Need the pid of the next worker in the loop... 830 */ 831 if ((nread = atomic_read(cmd, &nextworker, sizeof nextworker)) 832 != sizeof nextworker) { 833 quit("master/worker protocol botched - didn't get pid" 834 " of next worker"); 835 } 836 837 /* 838 * Get list of blocks to dump, read the blocks into tape buffer 839 */ 840 while ((nread = atomic_read(cmd, wp->req, reqsiz)) == reqsiz) { 841 struct req *p = wp->req; 842 843 for (trecno = 0; trecno < ntrec; 844 trecno += p->count, p += p->count) { 845 if (p->dblk) { 846 bread(p->dblk, wp->tblock[trecno], 847 p->count * TP_BSIZE); 848 } else { 849 if (p->count != 1 || atomic_read(cmd, 850 wp->tblock[trecno], 851 TP_BSIZE) != TP_BSIZE) 852 quit("master/worker protocol botched"); 853 } 854 } 855 856 sigemptyset(&nsigset); 857 sigaddset(&nsigset, SIGUSR2); 858 sigprocmask(SIG_BLOCK, &nsigset, &osigset); 859 while (!caught) 860 sigsuspend(&osigset); 861 caught = 0; 862 sigprocmask(SIG_SETMASK, &osigset, NULL); 863 864 /* Try to write the data... */ 865 eot_count = 0; 866 size = 0; 867 werror = 0; 868 869 while (eot_count < 10 && size < writesize) { 870 #ifdef RDUMP 871 if (host) 872 wrote = rmtwrite(wp->tblock[0]+size, 873 writesize-size); 874 else 875 #endif 876 wrote = write(tapefd, wp->tblock[0]+size, 877 writesize-size); 878 werror = errno; 879 #ifdef WRITEDEBUG 880 fprintf(stderr, "worker %d wrote %d werror %d\n", 881 worker_number, wrote, werror); 882 #endif 883 if (wrote < 0) 884 break; 885 if (wrote == 0) 886 eot_count++; 887 size += wrote; 888 } 889 890 #ifdef WRITEDEBUG 891 if (size != writesize) 892 fprintf(stderr, 893 "worker %d only wrote %d out of %d bytes and gave up.\n", 894 worker_number, size, writesize); 895 #endif 896 897 /* 898 * Handle ENOSPC as an EOT condition. 899 */ 900 if (wrote < 0 && werror == ENOSPC) { 901 wrote = 0; 902 eot_count++; 903 } 904 905 if (eot_count > 0) 906 size = 0; 907 908 if (wrote < 0) { 909 (void) kill(master, SIGUSR1); 910 sigemptyset(&nsigset); 911 for (;;) 912 sigsuspend(&nsigset); 913 } else { 914 /* 915 * pass size of write back to master 916 * (for EOT handling) 917 */ 918 (void) atomic_write(cmd, &size, sizeof size); 919 } 920 921 /* 922 * If partial write, don't want next worker to go. 923 * Also jolts him awake. 924 */ 925 (void) kill(nextworker, SIGUSR2); 926 } 927 printcachestats(); 928 if (nread != 0) 929 quite(errno, "error reading command pipe"); 930 } 931 932 /* 933 * Since a read from a pipe may not return all we asked for, 934 * loop until the count is satisfied (or error). 935 */ 936 static ssize_t 937 atomic_read(int fd, void *buf, int count) 938 { 939 ssize_t got, need = count; 940 941 while ((got = read(fd, buf, need)) > 0 && (need -= got) > 0) 942 buf = (char *)buf + got; 943 return (got < 0 ? got : count - need); 944 } 945 946 /* 947 * Since a write may not write all we ask if we get a signal, 948 * loop until the count is satisfied (or error). 949 */ 950 static ssize_t 951 atomic_write(int fd, const void *buf, int count) 952 { 953 ssize_t got, need = count; 954 955 while ((got = write(fd, buf, need)) > 0 && (need -= got) > 0) 956 buf = (const char *)buf + got; 957 return (got < 0 ? got : count - need); 958 } 959