1 #include "common.h" 2 #include <ctype.h> 3 4 void doalldirs(void); 5 void dodir(char*); 6 void dofile(Dir*); 7 void rundir(char*); 8 char* file(char*, char); 9 void warning(char*, void*); 10 void error(char*, void*); 11 int returnmail(char**, char*, char*); 12 void logit(char*, char*, char**); 13 void doload(int); 14 15 #define HUNK 32 16 char *cmd; 17 char *root; 18 int debug; 19 int giveup = 2*24*60*60; 20 int load; 21 int limit; 22 23 /* the current directory */ 24 Dir *dirbuf; 25 long ndirbuf = 0; 26 int nfiles; 27 char *curdir; 28 29 char *runqlog = "runq"; 30 31 int *pidlist; 32 char **badsys; /* array of recalcitrant systems */ 33 int nbad; 34 int npid = 50; 35 int sflag; /* single thread per directory */ 36 int aflag; /* all directories */ 37 int Eflag; /* ignore E.xxxxxx dates */ 38 int Rflag; /* no giving up, ever */ 39 40 void 41 usage(void) 42 { 43 fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); 44 exits(""); 45 } 46 47 void 48 main(int argc, char **argv) 49 { 50 char *qdir, *x; 51 52 qdir = 0; 53 54 ARGBEGIN{ 55 case 'l': 56 x = ARGF(); 57 if(x == 0) 58 usage(); 59 load = atoi(x); 60 if(load < 0) 61 load = 0; 62 break; 63 case 'E': 64 Eflag++; 65 break; 66 case 'R': /* no giving up -- just leave stuff in the queue */ 67 Rflag++; 68 break; 69 case 'a': 70 aflag++; 71 break; 72 case 'd': 73 debug++; 74 break; 75 case 'r': 76 limit = atoi(ARGF()); 77 break; 78 case 's': 79 sflag++; 80 break; 81 case 't': 82 giveup = 60*60*atoi(ARGF()); 83 break; 84 case 'q': 85 qdir = ARGF(); 86 if(qdir == 0) 87 usage(); 88 break; 89 case 'n': 90 npid = atoi(ARGF()); 91 if(npid == 0) 92 usage(); 93 break; 94 }ARGEND; 95 96 if(argc != 2) 97 usage(); 98 99 pidlist = malloc(npid*sizeof(*pidlist)); 100 if(pidlist == 0) 101 error("can't malloc", 0); 102 103 if(aflag == 0 && qdir == 0) { 104 qdir = getuser(); 105 if(qdir == 0) 106 error("unknown user", 0); 107 } 108 root = argv[0]; 109 cmd = argv[1]; 110 111 if(chdir(root) < 0) 112 error("can't cd to %s", root); 113 114 doload(1); 115 if(aflag) 116 doalldirs(); 117 else 118 dodir(qdir); 119 doload(0); 120 exits(0); 121 } 122 123 int 124 emptydir(char *name) 125 { 126 int fd; 127 long n; 128 char buf[2048]; 129 130 fd = open(name, OREAD); 131 if(fd < 0) 132 return 1; 133 n = read(fd, buf, sizeof(buf)); 134 close(fd); 135 if(n <= 0) { 136 if(debug) 137 fprint(2, "removing directory %s\n", name); 138 syslog(0, runqlog, "rmdir %s", name); 139 sysremove(name); 140 return 1; 141 } 142 return 0; 143 } 144 145 int 146 forkltd(void) 147 { 148 int i; 149 int pid; 150 151 for(i = 0; i < npid; i++){ 152 if(pidlist[i] <= 0) 153 break; 154 } 155 156 while(i >= npid){ 157 pid = waitpid(); 158 if(pid < 0){ 159 syslog(0, runqlog, "forkltd confused"); 160 exits(0); 161 } 162 163 for(i = 0; i < npid; i++) 164 if(pidlist[i] == pid) 165 break; 166 } 167 pidlist[i] = fork(); 168 return pidlist[i]; 169 } 170 171 /* 172 * run all user directories, must be bootes (or root on unix) to do this 173 */ 174 void 175 doalldirs(void) 176 { 177 Dir *db; 178 int fd; 179 long i, n; 180 181 182 fd = open(".", OREAD); 183 if(fd == -1){ 184 warning("reading %s", root); 185 return; 186 } 187 n = sysdirreadall(fd, &db); 188 if(n > 0){ 189 for(i=0; i<n; i++){ 190 if(db[i].qid.type & QTDIR){ 191 if(emptydir(db[i].name)) 192 continue; 193 switch(forkltd()){ 194 case -1: 195 syslog(0, runqlog, "out of procs"); 196 doload(0); 197 exits(0); 198 case 0: 199 if(sysdetach() < 0) 200 error("%r", 0); 201 dodir(db[i].name); 202 exits(0); 203 default: 204 break; 205 } 206 } 207 } 208 free(db); 209 } 210 close(fd); 211 } 212 213 /* 214 * cd to a user directory and run it 215 */ 216 void 217 dodir(char *name) 218 { 219 curdir = name; 220 221 if(chdir(name) < 0){ 222 warning("cd to %s", name); 223 return; 224 } 225 if(debug) 226 fprint(2, "running %s\n", name); 227 rundir(name); 228 chdir(".."); 229 } 230 231 /* 232 * run the current directory 233 */ 234 void 235 rundir(char *name) 236 { 237 int fd; 238 long i; 239 240 if(aflag && sflag) 241 fd = sysopenlocked(".", OREAD); 242 else 243 fd = open(".", OREAD); 244 if(fd == -1){ 245 warning("reading %s", name); 246 return; 247 } 248 nfiles = sysdirreadall(fd, &dirbuf); 249 if(nfiles > 0){ 250 for(i=0; i<nfiles; i++){ 251 if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') 252 continue; 253 dofile(&dirbuf[i]); 254 } 255 free(dirbuf); 256 } 257 if(aflag && sflag) 258 sysunlockfile(fd); 259 else 260 close(fd); 261 } 262 263 /* 264 * free files matching name in the current directory 265 */ 266 void 267 remmatch(char *name) 268 { 269 long i; 270 271 syslog(0, runqlog, "removing %s/%s", curdir, name); 272 273 for(i=0; i<nfiles; i++){ 274 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0) 275 sysremove(dirbuf[i].name); 276 } 277 278 /* error file (may have) appeared after we read the directory */ 279 /* stomp on data file in case of phase error */ 280 sysremove(file(name, 'D')); 281 sysremove(file(name, 'E')); 282 } 283 284 /* 285 * like trylock, but we've already got the lock on fd, 286 * and don't want an L. lock file. 287 */ 288 static Mlock * 289 keeplockalive(char *path, int fd) 290 { 291 char buf[1]; 292 Mlock *l; 293 294 l = malloc(sizeof(Mlock)); 295 if(l == 0) 296 return 0; 297 l->fd = fd; 298 l->name = s_new(); 299 s_append(l->name, path); 300 301 /* fork process to keep lock alive until sysunlock(l) */ 302 switch(l->pid = rfork(RFPROC)){ 303 default: 304 break; 305 case 0: 306 fd = l->fd; 307 for(;;){ 308 sleep(1000*60); 309 if(pread(fd, buf, 1, 0) < 0) 310 break; 311 } 312 _exits(0); 313 } 314 return l; 315 } 316 317 /* 318 * try a message 319 */ 320 void 321 dofile(Dir *dp) 322 { 323 Dir *d; 324 int dfd, ac, dtime, efd, pid, i, etime; 325 char *buf, *cp, **av; 326 Waitmsg *wm; 327 Biobuf *b; 328 Mlock *l = nil; 329 330 if(debug) 331 fprint(2, "dofile %s\n", dp->name); 332 /* 333 * if no data file or empty control or data file, just clean up 334 * the empty control file must be 15 minutes old, to minimize the 335 * chance of a race. 336 */ 337 d = dirstat(file(dp->name, 'D')); 338 if(d == nil){ 339 syslog(0, runqlog, "no data file for %s", dp->name); 340 remmatch(dp->name); 341 return; 342 } 343 if(dp->length == 0){ 344 if(time(0)-dp->mtime > 15*60){ 345 syslog(0, runqlog, "empty ctl file for %s", dp->name); 346 remmatch(dp->name); 347 } 348 return; 349 } 350 dtime = d->mtime; 351 free(d); 352 353 /* 354 * retry times depend on the age of the errors file 355 */ 356 if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){ 357 etime = d->mtime; 358 free(d); 359 if(etime - dtime < 15*60){ 360 /* up to the first 15 minutes, every 30 seconds */ 361 if(time(0) - etime < 30) 362 return; 363 } else if(etime - dtime < 60*60){ 364 /* up to the first hour, try every 15 minutes */ 365 if(time(0) - etime < 15*60) 366 return; 367 } else { 368 /* after the first hour, try once an hour */ 369 if(time(0) - etime < 60*60) 370 return; 371 } 372 373 } 374 375 /* 376 * open control and data 377 */ 378 b = sysopen(file(dp->name, 'C'), "rl", 0660); 379 if(b == 0) { 380 if(debug) 381 fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); 382 return; 383 } 384 dfd = open(file(dp->name, 'D'), OREAD); 385 if(dfd < 0){ 386 if(debug) 387 fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); 388 Bterm(b); 389 sysunlockfile(Bfildes(b)); 390 return; 391 } 392 393 /* 394 * make arg list 395 * - read args into (malloc'd) buffer 396 * - malloc a vector and copy pointers to args into it 397 */ 398 buf = malloc(dp->length+1); 399 if(buf == 0){ 400 warning("buffer allocation", 0); 401 Bterm(b); 402 sysunlockfile(Bfildes(b)); 403 close(dfd); 404 return; 405 } 406 if(Bread(b, buf, dp->length) != dp->length){ 407 warning("reading control file %s\n", dp->name); 408 Bterm(b); 409 sysunlockfile(Bfildes(b)); 410 close(dfd); 411 free(buf); 412 return; 413 } 414 buf[dp->length] = 0; 415 av = malloc(2*sizeof(char*)); 416 if(av == 0){ 417 warning("argv allocation", 0); 418 close(dfd); 419 free(buf); 420 Bterm(b); 421 sysunlockfile(Bfildes(b)); 422 return; 423 } 424 for(ac = 1, cp = buf; *cp; ac++){ 425 while(isspace(*cp)) 426 *cp++ = 0; 427 if(*cp == 0) 428 break; 429 430 av = realloc(av, (ac+2)*sizeof(char*)); 431 if(av == 0){ 432 warning("argv allocation", 0); 433 close(dfd); 434 free(buf); 435 Bterm(b); 436 sysunlockfile(Bfildes(b)); 437 return; 438 } 439 av[ac] = cp; 440 while(*cp && !isspace(*cp)){ 441 if(*cp++ == '"'){ 442 while(*cp && *cp != '"') 443 cp++; 444 if(*cp) 445 cp++; 446 } 447 } 448 } 449 av[0] = cmd; 450 av[ac] = 0; 451 452 if(!Eflag &&time(0) - dtime > giveup){ 453 if(returnmail(av, dp->name, "Giveup") != 0) 454 logit("returnmail failed", dp->name, av); 455 remmatch(dp->name); 456 goto done; 457 } 458 459 for(i = 0; i < nbad; i++){ 460 if(strcmp(av[3], badsys[i]) == 0) 461 goto done; 462 } 463 464 /* 465 * Ken's fs, for example, gives us 5 minutes of inactivity before 466 * the lock goes stale, so we have to keep reading it. 467 */ 468 l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); 469 470 /* 471 * transfer 472 */ 473 pid = fork(); 474 switch(pid){ 475 case -1: 476 sysunlock(l); 477 sysunlockfile(Bfildes(b)); 478 syslog(0, runqlog, "out of procs"); 479 exits(0); 480 case 0: 481 if(debug) { 482 fprint(2, "Starting %s", cmd); 483 for(ac = 0; av[ac]; ac++) 484 fprint(2, " %s", av[ac]); 485 fprint(2, "\n"); 486 } 487 logit("execing", dp->name, av); 488 close(0); 489 dup(dfd, 0); 490 close(dfd); 491 close(2); 492 efd = open(file(dp->name, 'E'), OWRITE); 493 if(efd < 0){ 494 if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); 495 efd = create(file(dp->name, 'E'), OWRITE, 0666); 496 if(efd < 0){ 497 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); 498 exits("could not open error file - Retry"); 499 } 500 } 501 seek(efd, 0, 2); 502 exec(cmd, av); 503 error("can't exec %s", cmd); 504 break; 505 default: 506 for(;;){ 507 wm = wait(); 508 if(wm == nil) 509 error("wait failed: %r", ""); 510 if(wm->pid == pid) 511 break; 512 free(wm); 513 } 514 if(debug) 515 fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); 516 517 if(wm->msg[0]){ 518 if(debug) 519 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); 520 syslog(0, runqlog, "message: %s\n", wm->msg); 521 if(strstr(wm->msg, "Ignore") != nil){ 522 /* fix for fish/chips, leave message alone */ 523 logit("ignoring", dp->name, av); 524 }else if(!Rflag && strstr(wm->msg, "Retry")==0){ 525 /* return the message and remove it */ 526 if(returnmail(av, dp->name, wm->msg) != 0) 527 logit("returnmail failed", dp->name, av); 528 remmatch(dp->name); 529 } else { 530 /* add sys to bad list and try again later */ 531 nbad++; 532 badsys = realloc(badsys, nbad*sizeof(char*)); 533 badsys[nbad-1] = strdup(av[3]); 534 } 535 } else { 536 /* it worked remove the message */ 537 remmatch(dp->name); 538 } 539 free(wm); 540 541 } 542 done: 543 if (l) 544 sysunlock(l); 545 Bterm(b); 546 sysunlockfile(Bfildes(b)); 547 free(buf); 548 free(av); 549 close(dfd); 550 } 551 552 553 /* 554 * return a name starting with the given character 555 */ 556 char* 557 file(char *name, char type) 558 { 559 static char nname[Elemlen+1]; 560 561 strncpy(nname, name, Elemlen); 562 nname[Elemlen] = 0; 563 nname[0] = type; 564 return nname; 565 } 566 567 /* 568 * send back the mail with an error message 569 * 570 * return 0 if successful 571 */ 572 int 573 returnmail(char **av, char *name, char *msg) 574 { 575 int pfd[2]; 576 Waitmsg *wm; 577 int fd; 578 char buf[256]; 579 char attachment[256]; 580 int i; 581 long n; 582 String *s; 583 char *sender; 584 585 if(av[1] == 0 || av[2] == 0){ 586 logit("runq - dumping bad file", name, av); 587 return 0; 588 } 589 590 s = unescapespecial(s_copy(av[2])); 591 sender = s_to_c(s); 592 593 if(!returnable(sender) || strcmp(sender, "postmaster") == 0) { 594 logit("runq - dumping p to p mail", name, av); 595 return 0; 596 } 597 598 if(pipe(pfd) < 0){ 599 logit("runq - pipe failed", name, av); 600 return -1; 601 } 602 603 switch(rfork(RFFDG|RFPROC|RFENVG)){ 604 case -1: 605 logit("runq - fork failed", name, av); 606 return -1; 607 case 0: 608 logit("returning", name, av); 609 close(pfd[1]); 610 close(0); 611 dup(pfd[0], 0); 612 close(pfd[0]); 613 putenv("upasname", "/dev/null"); 614 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN); 615 snprint(attachment, sizeof(attachment), "%s", file(name, 'D')); 616 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil); 617 error("can't exec", 0); 618 break; 619 default: 620 break; 621 } 622 623 close(pfd[0]); 624 fprint(pfd[1], "\n"); /* get out of headers */ 625 if(av[1]){ 626 fprint(pfd[1], "Your request ``%.20s ", av[1]); 627 for(n = 3; av[n]; n++) 628 fprint(pfd[1], "%s ", av[n]); 629 } 630 fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg); 631 fd = open(file(name, 'E'), OREAD); 632 if(fd >= 0){ 633 for(;;){ 634 n = read(fd, buf, sizeof(buf)); 635 if(n <= 0) 636 break; 637 if(write(pfd[1], buf, n) != n){ 638 close(fd); 639 goto out; 640 } 641 } 642 close(fd); 643 } 644 close(pfd[1]); 645 out: 646 wm = wait(); 647 if(wm == nil){ 648 syslog(0, "runq", "wait: %r"); 649 logit("wait failed", name, av); 650 return -1; 651 } 652 i = 0; 653 if(wm->msg[0]){ 654 i = -1; 655 syslog(0, "runq", "returnmail child: %s", wm->msg); 656 logit("returnmail child failed", name, av); 657 } 658 free(wm); 659 return i; 660 } 661 662 /* 663 * print a warning and continue 664 */ 665 void 666 warning(char *f, void *a) 667 { 668 char err[65]; 669 char buf[256]; 670 671 rerrstr(err, sizeof(err)); 672 snprint(buf, sizeof(buf), f, a); 673 fprint(2, "runq: %s: %s\n", buf, err); 674 } 675 676 /* 677 * print an error and die 678 */ 679 void 680 error(char *f, void *a) 681 { 682 char err[Errlen]; 683 char buf[256]; 684 685 rerrstr(err, sizeof(err)); 686 snprint(buf, sizeof(buf), f, a); 687 fprint(2, "runq: %s: %s\n", buf, err); 688 exits(buf); 689 } 690 691 void 692 logit(char *msg, char *file, char **av) 693 { 694 int n, m; 695 char buf[256]; 696 697 n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg); 698 for(; *av; av++){ 699 m = strlen(*av); 700 if(n + m + 4 > sizeof(buf)) 701 break; 702 sprint(buf + n, " '%s'", *av); 703 n += m + 3; 704 } 705 syslog(0, runqlog, "%s", buf); 706 } 707 708 char *loadfile = ".runqload"; 709 710 /* 711 * load balancing 712 */ 713 void 714 doload(int start) 715 { 716 int fd; 717 char buf[32]; 718 int i, n; 719 Mlock *l; 720 Dir *d; 721 722 if(load <= 0) 723 return; 724 725 if(chdir(root) < 0){ 726 load = 0; 727 return; 728 } 729 730 l = syslock(loadfile); 731 fd = open(loadfile, ORDWR); 732 if(fd < 0){ 733 fd = create(loadfile, 0666, ORDWR); 734 if(fd < 0){ 735 load = 0; 736 sysunlock(l); 737 return; 738 } 739 } 740 741 /* get current load */ 742 i = 0; 743 n = read(fd, buf, sizeof(buf)-1); 744 if(n >= 0){ 745 buf[n] = 0; 746 i = atoi(buf); 747 } 748 if(i < 0) 749 i = 0; 750 751 /* ignore load if file hasn't been changed in 30 minutes */ 752 d = dirfstat(fd); 753 if(d != nil){ 754 if(d->mtime + 30*60 < time(0)) 755 i = 0; 756 free(d); 757 } 758 759 /* if load already too high, give up */ 760 if(start && i >= load){ 761 sysunlock(l); 762 exits(0); 763 } 764 765 /* increment/decrement load */ 766 if(start) 767 i++; 768 else 769 i--; 770 seek(fd, 0, 0); 771 fprint(fd, "%d\n", i); 772 sysunlock(l); 773 close(fd); 774 } 775