1 #include "common.h"
2 #include <ctype.h>
3
4 void doalldirs(void);
5 void dodir(char*);
6 void dofile(Dir*);
7 void rundir(char*);
8 char* file(char*, char);
9 void warning(char*, void*);
10 void error(char*, void*);
11 int returnmail(char**, char*, char*);
12 void logit(char*, char*, char**);
13 void doload(int);
14
15 #define HUNK 32
16 char *cmd;
17 char *root;
18 int debug;
19 int giveup = 2*24*60*60;
20 int load;
21 int limit;
22
23 /* the current directory */
24 Dir *dirbuf;
25 long ndirbuf = 0;
26 int nfiles;
27 char *curdir;
28
29 char *runqlog = "runq";
30
31 int *pidlist;
32 char **badsys; /* array of recalcitrant systems */
33 int nbad;
34 int npid = 50;
35 int sflag; /* single thread per directory */
36 int aflag; /* all directories */
37 int Eflag; /* ignore E.xxxxxx dates */
38 int Rflag; /* no giving up, ever */
39
40 void
usage(void)41 usage(void)
42 {
43 fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
44 exits("");
45 }
46
47 void
main(int argc,char ** argv)48 main(int argc, char **argv)
49 {
50 char *qdir, *x;
51
52 qdir = 0;
53
54 ARGBEGIN{
55 case 'l':
56 x = ARGF();
57 if(x == 0)
58 usage();
59 load = atoi(x);
60 if(load < 0)
61 load = 0;
62 break;
63 case 'E':
64 Eflag++;
65 break;
66 case 'R': /* no giving up -- just leave stuff in the queue */
67 Rflag++;
68 break;
69 case 'a':
70 aflag++;
71 break;
72 case 'd':
73 debug++;
74 break;
75 case 'r':
76 limit = atoi(ARGF());
77 break;
78 case 's':
79 sflag++;
80 break;
81 case 't':
82 giveup = 60*60*atoi(ARGF());
83 break;
84 case 'q':
85 qdir = ARGF();
86 if(qdir == 0)
87 usage();
88 break;
89 case 'n':
90 npid = atoi(ARGF());
91 if(npid == 0)
92 usage();
93 break;
94 }ARGEND;
95
96 if(argc != 2)
97 usage();
98
99 pidlist = malloc(npid*sizeof(*pidlist));
100 if(pidlist == 0)
101 error("can't malloc", 0);
102
103 if(aflag == 0 && qdir == 0) {
104 qdir = getuser();
105 if(qdir == 0)
106 error("unknown user", 0);
107 }
108 root = argv[0];
109 cmd = argv[1];
110
111 if(chdir(root) < 0)
112 error("can't cd to %s", root);
113
114 doload(1);
115 if(aflag)
116 doalldirs();
117 else
118 dodir(qdir);
119 doload(0);
120 exits(0);
121 }
122
123 int
emptydir(char * name)124 emptydir(char *name)
125 {
126 int fd;
127 long n;
128 char buf[2048];
129
130 fd = open(name, OREAD);
131 if(fd < 0)
132 return 1;
133 n = read(fd, buf, sizeof(buf));
134 close(fd);
135 if(n <= 0) {
136 if(debug)
137 fprint(2, "removing directory %s\n", name);
138 syslog(0, runqlog, "rmdir %s", name);
139 sysremove(name);
140 return 1;
141 }
142 return 0;
143 }
144
145 int
forkltd(void)146 forkltd(void)
147 {
148 int i;
149 int pid;
150
151 for(i = 0; i < npid; i++){
152 if(pidlist[i] <= 0)
153 break;
154 }
155
156 while(i >= npid){
157 pid = waitpid();
158 if(pid < 0){
159 syslog(0, runqlog, "forkltd confused");
160 exits(0);
161 }
162
163 for(i = 0; i < npid; i++)
164 if(pidlist[i] == pid)
165 break;
166 }
167 pidlist[i] = fork();
168 return pidlist[i];
169 }
170
171 /*
172 * run all user directories, must be bootes (or root on unix) to do this
173 */
174 void
doalldirs(void)175 doalldirs(void)
176 {
177 Dir *db;
178 int fd;
179 long i, n;
180
181
182 fd = open(".", OREAD);
183 if(fd == -1){
184 warning("reading %s", root);
185 return;
186 }
187 n = sysdirreadall(fd, &db);
188 if(n > 0){
189 for(i=0; i<n; i++){
190 if(db[i].qid.type & QTDIR){
191 if(emptydir(db[i].name))
192 continue;
193 switch(forkltd()){
194 case -1:
195 syslog(0, runqlog, "out of procs");
196 doload(0);
197 exits(0);
198 case 0:
199 if(sysdetach() < 0)
200 error("%r", 0);
201 dodir(db[i].name);
202 exits(0);
203 default:
204 break;
205 }
206 }
207 }
208 free(db);
209 }
210 close(fd);
211 }
212
213 /*
214 * cd to a user directory and run it
215 */
216 void
dodir(char * name)217 dodir(char *name)
218 {
219 curdir = name;
220
221 if(chdir(name) < 0){
222 warning("cd to %s", name);
223 return;
224 }
225 if(debug)
226 fprint(2, "running %s\n", name);
227 rundir(name);
228 chdir("..");
229 }
230
231 /*
232 * run the current directory
233 */
234 void
rundir(char * name)235 rundir(char *name)
236 {
237 int fd;
238 long i;
239
240 if(aflag && sflag)
241 fd = sysopenlocked(".", OREAD);
242 else
243 fd = open(".", OREAD);
244 if(fd == -1){
245 warning("reading %s", name);
246 return;
247 }
248 nfiles = sysdirreadall(fd, &dirbuf);
249 if(nfiles > 0){
250 for(i=0; i<nfiles; i++){
251 if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
252 continue;
253 dofile(&dirbuf[i]);
254 }
255 free(dirbuf);
256 }
257 if(aflag && sflag)
258 sysunlockfile(fd);
259 else
260 close(fd);
261 }
262
263 /*
264 * free files matching name in the current directory
265 */
266 void
remmatch(char * name)267 remmatch(char *name)
268 {
269 long i;
270
271 syslog(0, runqlog, "removing %s/%s", curdir, name);
272
273 for(i=0; i<nfiles; i++){
274 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
275 sysremove(dirbuf[i].name);
276 }
277
278 /* error file (may have) appeared after we read the directory */
279 /* stomp on data file in case of phase error */
280 sysremove(file(name, 'D'));
281 sysremove(file(name, 'E'));
282 }
283
284 /*
285 * like trylock, but we've already got the lock on fd,
286 * and don't want an L. lock file.
287 */
288 static Mlock *
keeplockalive(char * path,int fd)289 keeplockalive(char *path, int fd)
290 {
291 char buf[1];
292 Mlock *l;
293
294 l = malloc(sizeof(Mlock));
295 if(l == 0)
296 return 0;
297 l->fd = fd;
298 l->name = s_new();
299 s_append(l->name, path);
300
301 /* fork process to keep lock alive until sysunlock(l) */
302 switch(l->pid = rfork(RFPROC)){
303 default:
304 break;
305 case 0:
306 fd = l->fd;
307 for(;;){
308 sleep(1000*60);
309 if(pread(fd, buf, 1, 0) < 0)
310 break;
311 }
312 _exits(0);
313 }
314 return l;
315 }
316
317 /*
318 * try a message
319 */
320 void
dofile(Dir * dp)321 dofile(Dir *dp)
322 {
323 Dir *d;
324 int dfd, ac, dtime, efd, pid, i, etime;
325 char *buf, *cp, **av;
326 Waitmsg *wm;
327 Biobuf *b;
328 Mlock *l = nil;
329
330 if(debug)
331 fprint(2, "dofile %s\n", dp->name);
332 /*
333 * if no data file or empty control or data file, just clean up
334 * the empty control file must be 15 minutes old, to minimize the
335 * chance of a race.
336 */
337 d = dirstat(file(dp->name, 'D'));
338 if(d == nil){
339 syslog(0, runqlog, "no data file for %s", dp->name);
340 remmatch(dp->name);
341 return;
342 }
343 if(dp->length == 0){
344 if(time(0)-dp->mtime > 15*60){
345 syslog(0, runqlog, "empty ctl file for %s", dp->name);
346 remmatch(dp->name);
347 }
348 return;
349 }
350 dtime = d->mtime;
351 free(d);
352
353 /*
354 * retry times depend on the age of the errors file
355 */
356 if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
357 etime = d->mtime;
358 free(d);
359 if(etime - dtime < 15*60){
360 /* up to the first 15 minutes, every 30 seconds */
361 if(time(0) - etime < 30)
362 return;
363 } else if(etime - dtime < 60*60){
364 /* up to the first hour, try every 15 minutes */
365 if(time(0) - etime < 15*60)
366 return;
367 } else {
368 /* after the first hour, try once an hour */
369 if(time(0) - etime < 60*60)
370 return;
371 }
372
373 }
374
375 /*
376 * open control and data
377 */
378 b = sysopen(file(dp->name, 'C'), "rl", 0660);
379 if(b == 0) {
380 if(debug)
381 fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
382 return;
383 }
384 dfd = open(file(dp->name, 'D'), OREAD);
385 if(dfd < 0){
386 if(debug)
387 fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
388 Bterm(b);
389 sysunlockfile(Bfildes(b));
390 return;
391 }
392
393 /*
394 * make arg list
395 * - read args into (malloc'd) buffer
396 * - malloc a vector and copy pointers to args into it
397 */
398 buf = malloc(dp->length+1);
399 if(buf == 0){
400 warning("buffer allocation", 0);
401 Bterm(b);
402 sysunlockfile(Bfildes(b));
403 close(dfd);
404 return;
405 }
406 if(Bread(b, buf, dp->length) != dp->length){
407 warning("reading control file %s\n", dp->name);
408 Bterm(b);
409 sysunlockfile(Bfildes(b));
410 close(dfd);
411 free(buf);
412 return;
413 }
414 buf[dp->length] = 0;
415 av = malloc(2*sizeof(char*));
416 if(av == 0){
417 warning("argv allocation", 0);
418 close(dfd);
419 free(buf);
420 Bterm(b);
421 sysunlockfile(Bfildes(b));
422 return;
423 }
424 for(ac = 1, cp = buf; *cp; ac++){
425 while(isspace(*cp))
426 *cp++ = 0;
427 if(*cp == 0)
428 break;
429
430 av = realloc(av, (ac+2)*sizeof(char*));
431 if(av == 0){
432 warning("argv allocation", 0);
433 close(dfd);
434 free(buf);
435 Bterm(b);
436 sysunlockfile(Bfildes(b));
437 return;
438 }
439 av[ac] = cp;
440 while(*cp && !isspace(*cp)){
441 if(*cp++ == '"'){
442 while(*cp && *cp != '"')
443 cp++;
444 if(*cp)
445 cp++;
446 }
447 }
448 }
449 av[0] = cmd;
450 av[ac] = 0;
451
452 if(!Eflag &&time(0) - dtime > giveup){
453 if(returnmail(av, dp->name, "Giveup") != 0)
454 logit("returnmail failed", dp->name, av);
455 remmatch(dp->name);
456 goto done;
457 }
458
459 for(i = 0; i < nbad; i++){
460 if(strcmp(av[3], badsys[i]) == 0)
461 goto done;
462 }
463
464 /*
465 * Ken's fs, for example, gives us 5 minutes of inactivity before
466 * the lock goes stale, so we have to keep reading it.
467 */
468 l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
469
470 /*
471 * transfer
472 */
473 pid = fork();
474 switch(pid){
475 case -1:
476 sysunlock(l);
477 sysunlockfile(Bfildes(b));
478 syslog(0, runqlog, "out of procs");
479 exits(0);
480 case 0:
481 if(debug) {
482 fprint(2, "Starting %s", cmd);
483 for(ac = 0; av[ac]; ac++)
484 fprint(2, " %s", av[ac]);
485 fprint(2, "\n");
486 }
487 logit("execing", dp->name, av);
488 close(0);
489 dup(dfd, 0);
490 close(dfd);
491 close(2);
492 efd = open(file(dp->name, 'E'), OWRITE);
493 if(efd < 0){
494 if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
495 efd = create(file(dp->name, 'E'), OWRITE, 0666);
496 if(efd < 0){
497 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
498 exits("could not open error file - Retry");
499 }
500 }
501 seek(efd, 0, 2);
502 exec(cmd, av);
503 error("can't exec %s", cmd);
504 break;
505 default:
506 for(;;){
507 wm = wait();
508 if(wm == nil)
509 error("wait failed: %r", "");
510 if(wm->pid == pid)
511 break;
512 free(wm);
513 }
514 if(debug)
515 fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
516
517 if(wm->msg[0]){
518 if(debug)
519 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
520 syslog(0, runqlog, "message: %s\n", wm->msg);
521 if(strstr(wm->msg, "Ignore") != nil){
522 /* fix for fish/chips, leave message alone */
523 logit("ignoring", dp->name, av);
524 }else if(!Rflag && strstr(wm->msg, "Retry")==0){
525 /* return the message and remove it */
526 if(returnmail(av, dp->name, wm->msg) != 0)
527 logit("returnmail failed", dp->name, av);
528 remmatch(dp->name);
529 } else {
530 /* add sys to bad list and try again later */
531 nbad++;
532 badsys = realloc(badsys, nbad*sizeof(char*));
533 badsys[nbad-1] = strdup(av[3]);
534 }
535 } else {
536 /* it worked remove the message */
537 remmatch(dp->name);
538 }
539 free(wm);
540
541 }
542 done:
543 if (l)
544 sysunlock(l);
545 Bterm(b);
546 sysunlockfile(Bfildes(b));
547 free(buf);
548 free(av);
549 close(dfd);
550 }
551
552
553 /*
554 * return a name starting with the given character
555 */
556 char*
file(char * name,char type)557 file(char *name, char type)
558 {
559 static char nname[Elemlen+1];
560
561 strncpy(nname, name, Elemlen);
562 nname[Elemlen] = 0;
563 nname[0] = type;
564 return nname;
565 }
566
567 /*
568 * send back the mail with an error message
569 *
570 * return 0 if successful
571 */
572 int
returnmail(char ** av,char * name,char * msg)573 returnmail(char **av, char *name, char *msg)
574 {
575 int pfd[2];
576 Waitmsg *wm;
577 int fd;
578 char buf[256];
579 char attachment[256];
580 int i;
581 long n;
582 String *s;
583 char *sender;
584
585 if(av[1] == 0 || av[2] == 0){
586 logit("runq - dumping bad file", name, av);
587 return 0;
588 }
589
590 s = unescapespecial(s_copy(av[2]));
591 sender = s_to_c(s);
592
593 if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
594 logit("runq - dumping p to p mail", name, av);
595 return 0;
596 }
597
598 if(pipe(pfd) < 0){
599 logit("runq - pipe failed", name, av);
600 return -1;
601 }
602
603 switch(rfork(RFFDG|RFPROC|RFENVG)){
604 case -1:
605 logit("runq - fork failed", name, av);
606 return -1;
607 case 0:
608 logit("returning", name, av);
609 close(pfd[1]);
610 close(0);
611 dup(pfd[0], 0);
612 close(pfd[0]);
613 putenv("upasname", "/dev/null");
614 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
615 snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
616 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
617 error("can't exec", 0);
618 break;
619 default:
620 break;
621 }
622
623 close(pfd[0]);
624 fprint(pfd[1], "\n"); /* get out of headers */
625 if(av[1]){
626 fprint(pfd[1], "Your request ``%.20s ", av[1]);
627 for(n = 3; av[n]; n++)
628 fprint(pfd[1], "%s ", av[n]);
629 }
630 fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
631 fd = open(file(name, 'E'), OREAD);
632 if(fd >= 0){
633 for(;;){
634 n = read(fd, buf, sizeof(buf));
635 if(n <= 0)
636 break;
637 if(write(pfd[1], buf, n) != n){
638 close(fd);
639 goto out;
640 }
641 }
642 close(fd);
643 }
644 close(pfd[1]);
645 out:
646 wm = wait();
647 if(wm == nil){
648 syslog(0, "runq", "wait: %r");
649 logit("wait failed", name, av);
650 return -1;
651 }
652 i = 0;
653 if(wm->msg[0]){
654 i = -1;
655 syslog(0, "runq", "returnmail child: %s", wm->msg);
656 logit("returnmail child failed", name, av);
657 }
658 free(wm);
659 return i;
660 }
661
662 /*
663 * print a warning and continue
664 */
665 void
warning(char * f,void * a)666 warning(char *f, void *a)
667 {
668 char err[65];
669 char buf[256];
670
671 rerrstr(err, sizeof(err));
672 snprint(buf, sizeof(buf), f, a);
673 fprint(2, "runq: %s: %s\n", buf, err);
674 }
675
676 /*
677 * print an error and die
678 */
679 void
error(char * f,void * a)680 error(char *f, void *a)
681 {
682 char err[Errlen];
683 char buf[256];
684
685 rerrstr(err, sizeof(err));
686 snprint(buf, sizeof(buf), f, a);
687 fprint(2, "runq: %s: %s\n", buf, err);
688 exits(buf);
689 }
690
691 void
logit(char * msg,char * file,char ** av)692 logit(char *msg, char *file, char **av)
693 {
694 int n, m;
695 char buf[256];
696
697 n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
698 for(; *av; av++){
699 m = strlen(*av);
700 if(n + m + 4 > sizeof(buf))
701 break;
702 sprint(buf + n, " '%s'", *av);
703 n += m + 3;
704 }
705 syslog(0, runqlog, "%s", buf);
706 }
707
708 char *loadfile = ".runqload";
709
710 /*
711 * load balancing
712 */
713 void
doload(int start)714 doload(int start)
715 {
716 int fd;
717 char buf[32];
718 int i, n;
719 Mlock *l;
720 Dir *d;
721
722 if(load <= 0)
723 return;
724
725 if(chdir(root) < 0){
726 load = 0;
727 return;
728 }
729
730 l = syslock(loadfile);
731 fd = open(loadfile, ORDWR);
732 if(fd < 0){
733 fd = create(loadfile, 0666, ORDWR);
734 if(fd < 0){
735 load = 0;
736 sysunlock(l);
737 return;
738 }
739 }
740
741 /* get current load */
742 i = 0;
743 n = read(fd, buf, sizeof(buf)-1);
744 if(n >= 0){
745 buf[n] = 0;
746 i = atoi(buf);
747 }
748 if(i < 0)
749 i = 0;
750
751 /* ignore load if file hasn't been changed in 30 minutes */
752 d = dirfstat(fd);
753 if(d != nil){
754 if(d->mtime + 30*60 < time(0))
755 i = 0;
756 free(d);
757 }
758
759 /* if load already too high, give up */
760 if(start && i >= load){
761 sysunlock(l);
762 exits(0);
763 }
764
765 /* increment/decrement load */
766 if(start)
767 i++;
768 else
769 i--;
770 seek(fd, 0, 0);
771 fprint(fd, "%d\n", i);
772 sysunlock(l);
773 close(fd);
774 }
775