xref: /plan9/sys/src/cmd/upas/q/runq.c (revision 39dc14208bd73723190dc96abcf59ed5c10cfd3b)
1 #include "common.h"
2 #include <ctype.h>
3 
4 void	doalldirs(void);
5 void	dodir(char*);
6 void	dofile(Dir*);
7 void	rundir(char*);
8 char*	file(char*, char);
9 void	warning(char*, void*);
10 void	error(char*, void*);
11 int	returnmail(char**, char*, char*);
12 void	logit(char*, char*, char**);
13 void	doload(int);
14 
15 #define HUNK 32
16 char	*cmd;
17 char	*root;
18 int	debug;
19 int	giveup = 2*24*60*60;
20 int	load;
21 int	limit;
22 
23 /* the current directory */
24 Dir	*dirbuf;
25 long	ndirbuf = 0;
26 int	nfiles;
27 char	*curdir;
28 
29 char *runqlog = "runq";
30 
31 int	*pidlist;
32 char	**badsys;		/* array of recalcitrant systems */
33 int	nbad;
34 int	npid = 50;
35 int	sflag;			/* single thread per directory */
36 int	aflag;			/* all directories */
37 int	Eflag;			/* ignore E.xxxxxx dates */
38 int	Rflag;			/* no giving up, ever */
39 
40 void
usage(void)41 usage(void)
42 {
43 	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
44 	exits("");
45 }
46 
47 void
main(int argc,char ** argv)48 main(int argc, char **argv)
49 {
50 	char *qdir, *x;
51 
52 	qdir = 0;
53 
54 	ARGBEGIN{
55 	case 'l':
56 		x = ARGF();
57 		if(x == 0)
58 			usage();
59 		load = atoi(x);
60 		if(load < 0)
61 			load = 0;
62 		break;
63 	case 'E':
64 		Eflag++;
65 		break;
66 	case 'R':	/* no giving up -- just leave stuff in the queue */
67 		Rflag++;
68 		break;
69 	case 'a':
70 		aflag++;
71 		break;
72 	case 'd':
73 		debug++;
74 		break;
75 	case 'r':
76 		limit = atoi(ARGF());
77 		break;
78 	case 's':
79 		sflag++;
80 		break;
81 	case 't':
82 		giveup = 60*60*atoi(ARGF());
83 		break;
84 	case 'q':
85 		qdir = ARGF();
86 		if(qdir == 0)
87 			usage();
88 		break;
89 	case 'n':
90 		npid = atoi(ARGF());
91 		if(npid == 0)
92 			usage();
93 		break;
94 	}ARGEND;
95 
96 	if(argc != 2)
97 		usage();
98 
99 	pidlist = malloc(npid*sizeof(*pidlist));
100 	if(pidlist == 0)
101 		error("can't malloc", 0);
102 
103 	if(aflag == 0 && qdir == 0) {
104 		qdir = getuser();
105 		if(qdir == 0)
106 			error("unknown user", 0);
107 	}
108 	root = argv[0];
109 	cmd = argv[1];
110 
111 	if(chdir(root) < 0)
112 		error("can't cd to %s", root);
113 
114 	doload(1);
115 	if(aflag)
116 		doalldirs();
117 	else
118 		dodir(qdir);
119 	doload(0);
120 	exits(0);
121 }
122 
123 int
emptydir(char * name)124 emptydir(char *name)
125 {
126 	int fd;
127 	long n;
128 	char buf[2048];
129 
130 	fd = open(name, OREAD);
131 	if(fd < 0)
132 		return 1;
133 	n = read(fd, buf, sizeof(buf));
134 	close(fd);
135 	if(n <= 0) {
136 		if(debug)
137 			fprint(2, "removing directory %s\n", name);
138 		syslog(0, runqlog, "rmdir %s", name);
139 		sysremove(name);
140 		return 1;
141 	}
142 	return 0;
143 }
144 
145 int
forkltd(void)146 forkltd(void)
147 {
148 	int i;
149 	int pid;
150 
151 	for(i = 0; i < npid; i++){
152 		if(pidlist[i] <= 0)
153 			break;
154 	}
155 
156 	while(i >= npid){
157 		pid = waitpid();
158 		if(pid < 0){
159 			syslog(0, runqlog, "forkltd confused");
160 			exits(0);
161 		}
162 
163 		for(i = 0; i < npid; i++)
164 			if(pidlist[i] == pid)
165 				break;
166 	}
167 	pidlist[i] = fork();
168 	return pidlist[i];
169 }
170 
171 /*
172  *  run all user directories, must be bootes (or root on unix) to do this
173  */
174 void
doalldirs(void)175 doalldirs(void)
176 {
177 	Dir *db;
178 	int fd;
179 	long i, n;
180 
181 
182 	fd = open(".", OREAD);
183 	if(fd == -1){
184 		warning("reading %s", root);
185 		return;
186 	}
187 	n = sysdirreadall(fd, &db);
188 	if(n > 0){
189 		for(i=0; i<n; i++){
190 			if(db[i].qid.type & QTDIR){
191 				if(emptydir(db[i].name))
192 					continue;
193 				switch(forkltd()){
194 				case -1:
195 					syslog(0, runqlog, "out of procs");
196 					doload(0);
197 					exits(0);
198 				case 0:
199 					if(sysdetach() < 0)
200 						error("%r", 0);
201 					dodir(db[i].name);
202 					exits(0);
203 				default:
204 					break;
205 				}
206 			}
207 		}
208 		free(db);
209 	}
210 	close(fd);
211 }
212 
213 /*
214  *  cd to a user directory and run it
215  */
216 void
dodir(char * name)217 dodir(char *name)
218 {
219 	curdir = name;
220 
221 	if(chdir(name) < 0){
222 		warning("cd to %s", name);
223 		return;
224 	}
225 	if(debug)
226 		fprint(2, "running %s\n", name);
227 	rundir(name);
228 	chdir("..");
229 }
230 
231 /*
232  *  run the current directory
233  */
234 void
rundir(char * name)235 rundir(char *name)
236 {
237 	int fd;
238 	long i;
239 
240 	if(aflag && sflag)
241 		fd = sysopenlocked(".", OREAD);
242 	else
243 		fd = open(".", OREAD);
244 	if(fd == -1){
245 		warning("reading %s", name);
246 		return;
247 	}
248 	nfiles = sysdirreadall(fd, &dirbuf);
249 	if(nfiles > 0){
250 		for(i=0; i<nfiles; i++){
251 			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
252 				continue;
253 			dofile(&dirbuf[i]);
254 		}
255 		free(dirbuf);
256 	}
257 	if(aflag && sflag)
258 		sysunlockfile(fd);
259 	else
260 		close(fd);
261 }
262 
263 /*
264  *  free files matching name in the current directory
265  */
266 void
remmatch(char * name)267 remmatch(char *name)
268 {
269 	long i;
270 
271 	syslog(0, runqlog, "removing %s/%s", curdir, name);
272 
273 	for(i=0; i<nfiles; i++){
274 		if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
275 			sysremove(dirbuf[i].name);
276 	}
277 
278 	/* error file (may have) appeared after we read the directory */
279 	/* stomp on data file in case of phase error */
280 	sysremove(file(name, 'D'));
281 	sysremove(file(name, 'E'));
282 }
283 
284 /*
285  *  like trylock, but we've already got the lock on fd,
286  *  and don't want an L. lock file.
287  */
288 static Mlock *
keeplockalive(char * path,int fd)289 keeplockalive(char *path, int fd)
290 {
291 	char buf[1];
292 	Mlock *l;
293 
294 	l = malloc(sizeof(Mlock));
295 	if(l == 0)
296 		return 0;
297 	l->fd = fd;
298 	l->name = s_new();
299 	s_append(l->name, path);
300 
301 	/* fork process to keep lock alive until sysunlock(l) */
302 	switch(l->pid = rfork(RFPROC)){
303 	default:
304 		break;
305 	case 0:
306 		fd = l->fd;
307 		for(;;){
308 			sleep(1000*60);
309 			if(pread(fd, buf, 1, 0) < 0)
310 				break;
311 		}
312 		_exits(0);
313 	}
314 	return l;
315 }
316 
317 /*
318  *  try a message
319  */
320 void
dofile(Dir * dp)321 dofile(Dir *dp)
322 {
323 	Dir *d;
324 	int dfd, ac, dtime, efd, pid, i, etime;
325 	char *buf, *cp, **av;
326 	Waitmsg *wm;
327 	Biobuf *b;
328 	Mlock *l = nil;
329 
330 	if(debug)
331 		fprint(2, "dofile %s\n", dp->name);
332 	/*
333 	 *  if no data file or empty control or data file, just clean up
334 	 *  the empty control file must be 15 minutes old, to minimize the
335 	 *  chance of a race.
336 	 */
337 	d = dirstat(file(dp->name, 'D'));
338 	if(d == nil){
339 		syslog(0, runqlog, "no data file for %s", dp->name);
340 		remmatch(dp->name);
341 		return;
342 	}
343 	if(dp->length == 0){
344 		if(time(0)-dp->mtime > 15*60){
345 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
346 			remmatch(dp->name);
347 		}
348 		return;
349 	}
350 	dtime = d->mtime;
351 	free(d);
352 
353 	/*
354 	 *  retry times depend on the age of the errors file
355 	 */
356 	if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
357 		etime = d->mtime;
358 		free(d);
359 		if(etime - dtime < 15*60){
360 			/* up to the first 15 minutes, every 30 seconds */
361 			if(time(0) - etime < 30)
362 				return;
363 		} else if(etime - dtime < 60*60){
364 			/* up to the first hour, try every 15 minutes */
365 			if(time(0) - etime < 15*60)
366 				return;
367 		} else {
368 			/* after the first hour, try once an hour */
369 			if(time(0) - etime < 60*60)
370 				return;
371 		}
372 
373 	}
374 
375 	/*
376 	 *  open control and data
377 	 */
378 	b = sysopen(file(dp->name, 'C'), "rl", 0660);
379 	if(b == 0) {
380 		if(debug)
381 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
382 		return;
383 	}
384 	dfd = open(file(dp->name, 'D'), OREAD);
385 	if(dfd < 0){
386 		if(debug)
387 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
388 		Bterm(b);
389 		sysunlockfile(Bfildes(b));
390 		return;
391 	}
392 
393 	/*
394 	 *  make arg list
395 	 *	- read args into (malloc'd) buffer
396 	 *	- malloc a vector and copy pointers to args into it
397 	 */
398 	buf = malloc(dp->length+1);
399 	if(buf == 0){
400 		warning("buffer allocation", 0);
401 		Bterm(b);
402 		sysunlockfile(Bfildes(b));
403 		close(dfd);
404 		return;
405 	}
406 	if(Bread(b, buf, dp->length) != dp->length){
407 		warning("reading control file %s\n", dp->name);
408 		Bterm(b);
409 		sysunlockfile(Bfildes(b));
410 		close(dfd);
411 		free(buf);
412 		return;
413 	}
414 	buf[dp->length] = 0;
415 	av = malloc(2*sizeof(char*));
416 	if(av == 0){
417 		warning("argv allocation", 0);
418 		close(dfd);
419 		free(buf);
420 		Bterm(b);
421 		sysunlockfile(Bfildes(b));
422 		return;
423 	}
424 	for(ac = 1, cp = buf; *cp; ac++){
425 		while(isspace(*cp))
426 			*cp++ = 0;
427 		if(*cp == 0)
428 			break;
429 
430 		av = realloc(av, (ac+2)*sizeof(char*));
431 		if(av == 0){
432 			warning("argv allocation", 0);
433 			close(dfd);
434 			free(buf);
435 			Bterm(b);
436 			sysunlockfile(Bfildes(b));
437 			return;
438 		}
439 		av[ac] = cp;
440 		while(*cp && !isspace(*cp)){
441 			if(*cp++ == '"'){
442 				while(*cp && *cp != '"')
443 					cp++;
444 				if(*cp)
445 					cp++;
446 			}
447 		}
448 	}
449 	av[0] = cmd;
450 	av[ac] = 0;
451 
452 	if(!Eflag &&time(0) - dtime > giveup){
453 		if(returnmail(av, dp->name, "Giveup") != 0)
454 			logit("returnmail failed", dp->name, av);
455 		remmatch(dp->name);
456 		goto done;
457 	}
458 
459 	for(i = 0; i < nbad; i++){
460 		if(strcmp(av[3], badsys[i]) == 0)
461 			goto done;
462 	}
463 
464 	/*
465 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
466 	 * the lock goes stale, so we have to keep reading it.
467  	 */
468 	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
469 
470 	/*
471 	 *  transfer
472 	 */
473 	pid = fork();
474 	switch(pid){
475 	case -1:
476 		sysunlock(l);
477 		sysunlockfile(Bfildes(b));
478 		syslog(0, runqlog, "out of procs");
479 		exits(0);
480 	case 0:
481 		if(debug) {
482 			fprint(2, "Starting %s", cmd);
483 			for(ac = 0; av[ac]; ac++)
484 				fprint(2, " %s", av[ac]);
485 			fprint(2, "\n");
486 		}
487 		logit("execing", dp->name, av);
488 		close(0);
489 		dup(dfd, 0);
490 		close(dfd);
491 		close(2);
492 		efd = open(file(dp->name, 'E'), OWRITE);
493 		if(efd < 0){
494 			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
495 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
496 			if(efd < 0){
497 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
498 				exits("could not open error file - Retry");
499 			}
500 		}
501 		seek(efd, 0, 2);
502 		exec(cmd, av);
503 		error("can't exec %s", cmd);
504 		break;
505 	default:
506 		for(;;){
507 			wm = wait();
508 			if(wm == nil)
509 				error("wait failed: %r", "");
510 			if(wm->pid == pid)
511 				break;
512 			free(wm);
513 		}
514 		if(debug)
515 			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
516 
517 		if(wm->msg[0]){
518 			if(debug)
519 				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
520 			syslog(0, runqlog, "message: %s\n", wm->msg);
521 			if(strstr(wm->msg, "Ignore") != nil){
522 				/* fix for fish/chips, leave message alone */
523 				logit("ignoring", dp->name, av);
524 			}else if(!Rflag && strstr(wm->msg, "Retry")==0){
525 				/* return the message and remove it */
526 				if(returnmail(av, dp->name, wm->msg) != 0)
527 					logit("returnmail failed", dp->name, av);
528 				remmatch(dp->name);
529 			} else {
530 				/* add sys to bad list and try again later */
531 				nbad++;
532 				badsys = realloc(badsys, nbad*sizeof(char*));
533 				badsys[nbad-1] = strdup(av[3]);
534 			}
535 		} else {
536 			/* it worked remove the message */
537 			remmatch(dp->name);
538 		}
539 		free(wm);
540 
541 	}
542 done:
543 	if (l)
544 		sysunlock(l);
545 	Bterm(b);
546 	sysunlockfile(Bfildes(b));
547 	free(buf);
548 	free(av);
549 	close(dfd);
550 }
551 
552 
553 /*
554  *  return a name starting with the given character
555  */
556 char*
file(char * name,char type)557 file(char *name, char type)
558 {
559 	static char nname[Elemlen+1];
560 
561 	strncpy(nname, name, Elemlen);
562 	nname[Elemlen] = 0;
563 	nname[0] = type;
564 	return nname;
565 }
566 
567 /*
568  *  send back the mail with an error message
569  *
570  *  return 0 if successful
571  */
572 int
returnmail(char ** av,char * name,char * msg)573 returnmail(char **av, char *name, char *msg)
574 {
575 	int pfd[2];
576 	Waitmsg *wm;
577 	int fd;
578 	char buf[256];
579 	char attachment[256];
580 	int i;
581 	long n;
582 	String *s;
583 	char *sender;
584 
585 	if(av[1] == 0 || av[2] == 0){
586 		logit("runq - dumping bad file", name, av);
587 		return 0;
588 	}
589 
590 	s = unescapespecial(s_copy(av[2]));
591 	sender = s_to_c(s);
592 
593 	if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
594 		logit("runq - dumping p to p mail", name, av);
595 		return 0;
596 	}
597 
598 	if(pipe(pfd) < 0){
599 		logit("runq - pipe failed", name, av);
600 		return -1;
601 	}
602 
603 	switch(rfork(RFFDG|RFPROC|RFENVG)){
604 	case -1:
605 		logit("runq - fork failed", name, av);
606 		return -1;
607 	case 0:
608 		logit("returning", name, av);
609 		close(pfd[1]);
610 		close(0);
611 		dup(pfd[0], 0);
612 		close(pfd[0]);
613 		putenv("upasname", "/dev/null");
614 		snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
615 		snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
616 		execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
617 		error("can't exec", 0);
618 		break;
619 	default:
620 		break;
621 	}
622 
623 	close(pfd[0]);
624 	fprint(pfd[1], "\n");	/* get out of headers */
625 	if(av[1]){
626 		fprint(pfd[1], "Your request ``%.20s ", av[1]);
627 		for(n = 3; av[n]; n++)
628 			fprint(pfd[1], "%s ", av[n]);
629 	}
630 	fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
631 	fd = open(file(name, 'E'), OREAD);
632 	if(fd >= 0){
633 		for(;;){
634 			n = read(fd, buf, sizeof(buf));
635 			if(n <= 0)
636 				break;
637 			if(write(pfd[1], buf, n) != n){
638 				close(fd);
639 				goto out;
640 			}
641 		}
642 		close(fd);
643 	}
644 	close(pfd[1]);
645 out:
646 	wm = wait();
647 	if(wm == nil){
648 		syslog(0, "runq", "wait: %r");
649 		logit("wait failed", name, av);
650 		return -1;
651 	}
652 	i = 0;
653 	if(wm->msg[0]){
654 		i = -1;
655 		syslog(0, "runq", "returnmail child: %s", wm->msg);
656 		logit("returnmail child failed", name, av);
657 	}
658 	free(wm);
659 	return i;
660 }
661 
662 /*
663  *  print a warning and continue
664  */
665 void
warning(char * f,void * a)666 warning(char *f, void *a)
667 {
668 	char err[65];
669 	char buf[256];
670 
671 	rerrstr(err, sizeof(err));
672 	snprint(buf, sizeof(buf), f, a);
673 	fprint(2, "runq: %s: %s\n", buf, err);
674 }
675 
676 /*
677  *  print an error and die
678  */
679 void
error(char * f,void * a)680 error(char *f, void *a)
681 {
682 	char err[Errlen];
683 	char buf[256];
684 
685 	rerrstr(err, sizeof(err));
686 	snprint(buf, sizeof(buf), f, a);
687 	fprint(2, "runq: %s: %s\n", buf, err);
688 	exits(buf);
689 }
690 
691 void
logit(char * msg,char * file,char ** av)692 logit(char *msg, char *file, char **av)
693 {
694 	int n, m;
695 	char buf[256];
696 
697 	n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
698 	for(; *av; av++){
699 		m = strlen(*av);
700 		if(n + m + 4 > sizeof(buf))
701 			break;
702 		sprint(buf + n, " '%s'", *av);
703 		n += m + 3;
704 	}
705 	syslog(0, runqlog, "%s", buf);
706 }
707 
708 char *loadfile = ".runqload";
709 
710 /*
711  *  load balancing
712  */
713 void
doload(int start)714 doload(int start)
715 {
716 	int fd;
717 	char buf[32];
718 	int i, n;
719 	Mlock *l;
720 	Dir *d;
721 
722 	if(load <= 0)
723 		return;
724 
725 	if(chdir(root) < 0){
726 		load = 0;
727 		return;
728 	}
729 
730 	l = syslock(loadfile);
731 	fd = open(loadfile, ORDWR);
732 	if(fd < 0){
733 		fd = create(loadfile, 0666, ORDWR);
734 		if(fd < 0){
735 			load = 0;
736 			sysunlock(l);
737 			return;
738 		}
739 	}
740 
741 	/* get current load */
742 	i = 0;
743 	n = read(fd, buf, sizeof(buf)-1);
744 	if(n >= 0){
745 		buf[n] = 0;
746 		i = atoi(buf);
747 	}
748 	if(i < 0)
749 		i = 0;
750 
751 	/* ignore load if file hasn't been changed in 30 minutes */
752 	d = dirfstat(fd);
753 	if(d != nil){
754 		if(d->mtime + 30*60 < time(0))
755 			i = 0;
756 		free(d);
757 	}
758 
759 	/* if load already too high, give up */
760 	if(start && i >= load){
761 		sysunlock(l);
762 		exits(0);
763 	}
764 
765 	/* increment/decrement load */
766 	if(start)
767 		i++;
768 	else
769 		i--;
770 	seek(fd, 0, 0);
771 	fprint(fd, "%d\n", i);
772 	sysunlock(l);
773 	close(fd);
774 }
775