xref: /inferno-os/appl/cmd/plumber.b (revision 37da2899f40661e3e9631e497da8dc59b971cbd0)
1implement Plumber;
2
3include "sys.m";
4	sys: Sys;
5
6include "draw.m";
7	draw: Draw;
8
9include "sh.m";
10
11include "regex.m";
12	regex: Regex;
13
14include "string.m";
15	str: String;
16
17include "../lib/plumbing.m";
18	plumbing: Plumbing;
19	Pattern, Rule: import plumbing;
20
21include "plumbmsg.m";
22	plumbmsg: Plumbmsg;
23	Msg, Attr: import plumbmsg;
24
25include "arg.m";
26
27Plumber: module
28{
29	init:	fn(ctxt: ref Draw->Context, argl: list of string);
30};
31
32Input: adt
33{
34	inc:		chan of ref Inmesg;
35	resc:		chan of int;
36	io:		ref Sys->FileIO;
37};
38
39Output: adt
40{
41	name:	string;
42	outc:		chan of string;
43	io:		ref Sys->FileIO;
44	queue:	list of array of byte;
45	started:	int;
46	startup:	string;
47	waiting:	int;
48};
49
50Port: adt
51{
52	name:		string;
53	startup:	string;
54	alwaysstart:	int;
55};
56
57Match: adt
58{
59	p0, p1:	int;
60};
61
62Inmesg: adt
63{
64	msg:		ref Msg;
65	text:		string;	# if kind is text
66	p0,p1:	int;
67	match:	array of Match;
68	port:		int;
69	startup:	string;
70	args:		list of string;
71	attrs:		list of ref Attr;
72	clearclick:	int;
73	set:		int;
74	# $ arguments
75	_n:		array of string;
76	_dir:		string;
77	_file:		string;
78};
79
80# Message status after processing
81HANDLED: con -1;
82UNKNOWN: con -2;
83NOTSTARTED: con -3;
84
85output: array of ref Output;
86
87input: ref Input;
88
89stderr: ref Sys->FD;
90pgrp: int;
91rules: list of ref Rule;
92titlectl: chan of string;
93ports: list of ref Port;
94wmstartup := 0;
95wmchan := "/chan/wm";
96verbose := 0;
97
98context: ref Draw->Context;
99
100usage()
101{
102	sys->fprint(stderr, "Usage: plumb [-vw] [-c wmchan] [initfile ...]\n");
103	raise "fail:usage";
104}
105
106init(ctxt: ref Draw->Context, args: list of string)
107{
108	context = ctxt;
109
110	sys = load Sys Sys->PATH;
111	draw = load Draw Draw->PATH;
112	stderr = sys->fildes(2);
113
114	regex = load Regex Regex->PATH;
115	plumbing = load Plumbing Plumbing->PATH;
116	str = load String String->PATH;
117
118	err: string;
119	nogrp := 0;
120
121	arg := load Arg Arg->PATH;
122	arg->init(args);
123	while ((opt := arg->opt()) != 0) {
124		case opt {
125		'w' =>
126			wmstartup = 1;
127		'c' =>
128			if ((wmchan = arg->arg()) == nil)
129				usage();
130		'v' =>
131			verbose = 1;
132		'n' =>
133			nogrp = 1;
134		* =>
135			usage();
136		}
137	}
138	args = arg->argv();
139	arg = nil;
140
141	(rules, err) = plumbing->init(regex, args);
142	if(err != nil){
143		sys->fprint(stderr, "plumb: %s\n", err);
144		raise "fail:init";
145	}
146
147	plumbmsg = load Plumbmsg Plumbmsg->PATH;
148	plumbmsg->init(0, nil, 0);
149
150	if(nogrp)
151		pgrp = sys->pctl(0, nil);
152	else
153		pgrp = sys->pctl(sys->NEWPGRP, nil);
154
155	r := rules;
156	for(i:=0; i<len rules; i++){
157		rule := hd r;
158		r = tl r;
159		for(j:=0; j<len rule.action; j++)
160			if(rule.action[j].pred == "to" || rule.action[j].pred == "alwaysstart"){
161				p := findport(rule.action[j].arg);
162				if(p == nil){
163					p = ref Port(rule.action[j].arg, nil, rule.action[j].pred == "alwaysstart");
164					ports = p :: ports;
165				}
166				for(k:=0; k<len rule.action; k++)
167					if(rule.action[k].pred == "start")
168						p.startup = rule.action[k].arg;
169				break;
170			}
171	}
172
173	input = ref Input;
174	input.io = makefile("plumb.input");
175	if(input.io == nil)
176		shutdown();
177	input.inc = chan of ref Inmesg;
178	input.resc = chan of int;
179	spawn receiver(input);
180
181	output = array[len ports] of ref Output;
182
183	pp := ports;
184	for(i=0; i<len output; i++){
185		p := hd pp;
186		pp = tl pp;
187		output[i] = ref Output;
188		output[i].name = p.name;
189		output[i].io = makefile("plumb."+p.name);
190		if(output[i].io == nil)
191			shutdown();
192		output[i].outc = chan of string;
193		output[i].started = 0;
194		output[i].startup = p.startup;
195		output[i].waiting = 0;
196	}
197
198	# spawn so we return without needing to run plumb in background
199	spawn sender(input, output);
200}
201
202findport(name: string): ref Port
203{
204	for(p:=ports; p!=nil; p=tl p)
205		if((hd p).name == name)
206			return hd p;
207	return nil;
208}
209
210makefile(file: string): ref Sys->FileIO
211{
212	io := sys->file2chan("/chan", file);
213	if(io == nil){
214		sys->fprint(stderr, "plumb: can't establish /chan/%s: %r\n", file);
215		return nil;
216	}
217	return io;
218}
219
220receiver(input: ref Input)
221{
222
223	for(;;){
224		(nil, msg, nil, wc) := <-input.io.write;
225		if(wc == nil)
226			;	# not interested in EOF; leave channel open
227		else{
228			input.inc <-= parse(msg);
229			res := <- input.resc;
230			err := "";
231			if(res == UNKNOWN)
232				err = "no matching plumb rule";
233			wc <-= (len msg, err);
234		}
235	}
236}
237
238sender(input: ref Input, output: array of ref Output)
239{
240	outputc := array[len output] of chan of (int, int, int, Sys->Rread);
241
242	for(;;){
243		alt{
244		in := <-input.inc =>
245			if(in == nil){
246				input.resc <-= HANDLED;
247				break;
248			}
249			(j, msg) := process(in);
250			case j {
251			HANDLED =>
252				break;
253			UNKNOWN =>
254				if(in.msg.src != "acme")
255					sys->fprint(stderr, "plumb: don't know who message goes to\n");
256			NOTSTARTED =>
257				sys->fprint(stderr, "plumb: can't start application\n");
258			* =>
259				output[j].queue = append(output[j].queue, msg);
260				outputc[j] = output[j].io.read;
261			}
262			input.resc <-= j;
263
264		(j, tmp) := <-outputc =>
265			(nil, nbytes, nil, rc) := tmp;
266			if(rc == nil)	# no interest in EOF
267				break;
268			msg := hd output[j].queue;
269			if(nbytes < len msg){
270				rc <-= (nil, "buffer too short for message");
271				break;
272			}
273			output[j].queue = tl output[j].queue;
274			if(output[j].queue == nil)
275				outputc[j] = nil;
276			rc <-= (msg, nil);
277		}
278	}
279}
280
281parse(a: array of byte): ref Inmesg
282{
283	msg := Msg.unpack(a);
284	if(msg == nil)
285		return nil;
286	i := ref Inmesg;
287	i.msg = msg;
288	if(msg.dst != nil){
289		if(control(i))
290			return nil;
291		toport(i, msg.dst);
292	}else
293		i.port = -1;
294	i.match = array[10] of { * => Match(-1, -1)};
295	i._n = array[10] of string;
296	i.attrs = plumbmsg->string2attrs(i.msg.attr);
297	return i;
298}
299
300append(l: list of array of byte, a: array of byte): list of array of byte
301{
302	if(l == nil)
303		return a :: nil;
304	return hd l :: append(tl l, a);
305}
306
307shutdown()
308{
309	fname := sys->sprint("#p/%d/ctl", pgrp);
310	if((fdesc := sys->open(fname, sys->OWRITE)) != nil)
311		sys->write(fdesc, array of byte "killgrp\n", 8);
312	raise "fail:error";
313}
314
315# Handle control messages
316control(in: ref Inmesg): int
317{
318	msg := in.msg;
319	if(msg.kind!="text" || msg.dst!="plumb")
320		return 0;
321	text := string msg.data;
322	case text {
323	"start" =>
324		start(msg.src, 1);
325	"stop" =>
326		start(msg.src, -1);
327	* =>
328		sys->fprint(stderr, "plumb: unrecognized control message from %s: %s\n", msg.src, text);
329	}
330	return 1;
331}
332
333start(port: string, startstop: int)
334{
335	for(i:=0; i<len output; i++)
336		if(port == output[i].name){
337			output[i].waiting = 0;
338			output[i].started += startstop;
339			return;
340		}
341	sys->fprint(stderr, "plumb: \"start\" message from unrecognized port %s\n", port);
342}
343
344startup(dir, prog: string, args: list of string, wait: chan of int)
345{
346	if(wmstartup){
347		fd := sys->open(wmchan, Sys->OWRITE);
348		if(fd != nil){
349			sys->fprint(fd, "s %s", str->quoted(dir :: prog :: args));
350			wait <-= 1;
351			return;
352		}
353	}
354
355	sys->pctl(Sys->NEWFD|Sys->NEWPGRP|Sys->FORKNS, list of {0, 1, 2});
356	wait <-= 1;
357	wait = nil;
358	mod := load Command prog;
359	if(mod == nil){
360		sys->fprint(stderr, "plumb: can't load %s: %r\n", prog);
361		return;
362	}
363	sys->chdir(dir);
364	mod->init(context, prog :: args);
365}
366
367# See if messages should be queued while waiting for program to connect
368shouldqueue(out: ref Output): int
369{
370	p := findport(out.name);
371	if(p == nil){
372		sys->fprint(stderr, "plumb: can't happen in shouldqueue\n");
373		return 0;
374	}
375	if(p.alwaysstart)
376		return 0;
377	return out.waiting;
378}
379
380# Determine destination of input message, reformat for output
381process(in: ref Inmesg): (int, array of byte)
382{
383	if(!clarify(in))
384		return (UNKNOWN, nil);
385	if(in.port < 0)
386		return (UNKNOWN, nil);
387	a := in.msg.pack();
388	j := in.port;
389	if(a == nil)
390		j = UNKNOWN;
391	else if(output[j].started==0 && !shouldqueue(output[j])){
392		path: string;
393		args: list of string;
394		if(in.startup!=nil){
395			path = macro(in, in.startup);
396			args = expand(in, in.args);
397		}else if(output[j].startup != nil){
398			path = output[j].startup;
399			args = in.text :: nil;
400		}else
401			return (NOTSTARTED, nil);
402		log(sys->sprint("start %s port %s\n", path, output[j].name));
403		wait := chan of int;
404		output[j].waiting = 1;
405		spawn startup(in.msg.dir, path, args, wait);
406		<-wait;
407		return (HANDLED, nil);
408	}else{
409		if(in.msg.kind != "text")
410			text := sys->sprint("message of type %s", in.msg.kind);
411		else{
412			text = in.text;
413			for(i:=0; i<len text; i++){
414				if(text[i]=='\n'){
415					text = text[0:i];
416					break;
417				}
418				if(i > 50) {
419					text = text[0:i]+"...";
420					break;
421				}
422			}
423		}
424		log(sys->sprint("send \"%s\" to %s", text, output[j].name));
425	}
426	return (j, a);
427}
428
429# expand $arguments
430expand(in: ref Inmesg, args: list of string): list of string
431{
432	a: list of string;
433	while(args != nil){
434		a = macro(in, hd args) :: a;
435		args = tl args;
436	}
437	while(a != nil){
438		args = hd a :: args;
439		a = tl a;
440	}
441	return args;
442}
443
444# resolve all ambiguities, fill in any missing fields
445clarify(in: ref Inmesg): int
446{
447	in.clearclick = 0;
448	in.set = 0;
449	msg := in.msg;
450	if(msg.kind != "text")
451		return 0;
452	in.text = string msg.data;
453	if(msg.dst != "")
454		return 1;
455	return dorules(in, rules);
456}
457
458dorules(in: ref Inmesg, rules: list of ref Rule): int
459{
460	if (verbose)
461		log("msg: " + inmesg2s(in));
462	for(r:=rules; r!=nil; r=tl r) {
463		if(matchrule(in, hd r)){
464			applyrule(in, hd r);
465			if (verbose)
466				log("yes");
467			return 1;
468		} else if (verbose)
469			log("no");
470	}
471	return 0;
472}
473
474inmesg2s(in: ref Inmesg): string
475{
476	m := in.msg;
477	s := sys->sprint("src=%s; dst=%s; dir=%s; kind=%s; attr='%s'",
478			m.src, m.dst, m.dir, m.kind, m.attr);
479	if (m.kind == "text")
480		s += "; data='" + string m.data + "'";
481	return s;
482}
483
484matchrule(in: ref Inmesg, r: ref Rule): int
485{
486	pats := r.pattern;
487	for(i:=0; i<len in.match; i++)
488		in.match[i] = (-1,-1);
489	# no rules at all implies success, so return if any fail
490	for(i=0; i<len pats; i++)
491		if(matchpattern(in, pats[i]) == 0)
492			return 0;
493	return 1;
494}
495
496applyrule(in: ref Inmesg, r: ref Rule)
497{
498	acts := r.action;
499	for(i:=0; i<len acts; i++)
500		applypattern(in, acts[i]);
501	if(in.clearclick){
502		al: list of ref Attr;
503		for(l:=in.attrs; l!=nil; l=tl l)
504			if((hd l).name != "click")
505				al = hd l :: al;
506		in.attrs = al;
507		in.msg.attr = plumbmsg->attrs2string(al);
508		if(in.set){
509			in.text = macro(in, "$0");
510			in.msg.data = array of byte in.text;
511		}
512	}
513}
514
515matchpattern(in: ref Inmesg, p: ref Pattern): int
516{
517	msg := in.msg;
518	text: string;
519	case p.field {
520	"src" =>	text = msg.src;
521	"dst" =>	text = msg.dst;
522	"dir" =>	text = msg.dir;
523	"kind" =>	text = msg.kind;
524	"attr" =>	text = msg.attr;
525	"data" =>	text = in.text;
526	* =>
527		sys->fprint(stderr, "plumb: don't recognize pattern field %s\n", p.field);
528		return 0;
529	}
530	if (verbose)
531		log(sys->sprint("'%s' %s '%s'\n", text, p.pred, p.arg));
532	case p.pred {
533	"is" =>
534		return text == p.arg;
535	"isfile" or "isdir" =>
536		text = p.arg;
537		if(p.expand)
538			text = macro(in, text);
539		if(len text == 0)
540			return 0;
541		if(len in.msg.dir!=0 && text[0] != '/' && text[0]!='#')
542			text = in.msg.dir+"/"+text;
543		text = cleanname(text);
544		(ok, dir) := sys->stat(text);
545		if(ok < 0)
546			return 0;
547		if(p.pred=="isfile" && (dir.mode&Sys->DMDIR)==0){
548			in._file = text;
549			return 1;
550		}
551		if(p.pred=="isdir" && (dir.mode&Sys->DMDIR)!=0){
552			in._dir = text;
553			return 1;
554		}
555		return 0;
556	"matches" =>
557		(clickspecified, val) := plumbmsg->lookup(in.attrs, "click");
558		if(p.field != "data")
559			clickspecified = 0;
560		if(!clickspecified){
561			# easy case. must match whole string
562			matches := regex->execute(p.regex, text);
563			if(matches == nil)
564				return 0;
565			(p0, p1) := matches[0];
566			if(p0!=0 || p1!=len text)
567				return 0;
568			in.match = matches;
569			setvars(in, text);
570			return 1;
571		}
572		matches := clickmatch(p.regex, text, int val);
573		if(matches == nil)
574			return 0;
575		(p0, p1) := matches[0];
576		# assumes all matches are in same sequence
577		if(in.match[0].p0 != -1)
578			return p0==in.match[0].p0 && p1==in.match[0].p1;
579		in.match = matches;
580		setvars(in, text);
581		in.clearclick = 1;
582		in.set = 1;
583		return 1;
584	"set" =>
585		text = p.arg;
586		if(p.expand)
587			text = macro(in, text);
588		case p.field {
589		"src" =>	msg.src = text;
590		"dst" =>	msg.dst = text;
591		"dir" =>	msg.dir = text;
592		"kind" =>	msg.kind = text;
593		"attr" =>	msg.attr = text;
594		"data" =>	in.text = text;
595				msg.data = array of byte text;
596				msg.kind = "text";
597				in.set = 0;
598		}
599		return 1;
600	* =>
601		sys->fprint(stderr, "plumb: don't recognize pattern predicate %s\n", p.pred);
602	}
603	return 0;
604}
605
606applypattern(in: ref Inmesg, p: ref Pattern): int
607{
608	if(p.field != "plumb"){
609		sys->fprint(stderr, "plumb: don't recognize action field %s\n", p.field);
610		return 0;
611	}
612	case p.pred {
613	"to" or "alwaysstart" =>
614		if(in.port >= 0)	# already specified
615			return 1;
616		toport(in, p.arg);
617	"start" =>
618		in.startup = p.arg;
619		in.args = p.extra;
620	* =>
621		sys->fprint(stderr, "plumb: don't recognize action %s\n", p.pred);
622	}
623	return 1;
624}
625
626toport(in: ref Inmesg, name: string): int
627{
628	for(i:=0; i<len output; i++)
629		if(name == output[i].name){
630			in.msg.dst = name;
631			in.port = i;
632			return i;
633		}
634	in.port = -1;
635	sys->fprint(stderr, "plumb: unrecognized port %s\n", name);
636	return -1;
637}
638
639# simple heuristic: look for leftmost match that reaches click position
640clickmatch(re: ref Regex->Arena, text: string, click: int): array of Match
641{
642	for(i:=0; i<=click && i < len text; i++){
643		matches := regex->executese(re, text, (i, -1), i == 0, 1);
644		if(matches == nil)
645			continue;
646		(p0, p1) := matches[0];
647
648		if(p0>=i && p1>=click)
649			return matches;
650	}
651	return nil;
652}
653
654setvars(in: ref Inmesg, text: string)
655{
656	for(i:=0; i<len in.match && in.match[i].p0>=0; i++)
657		in._n[i] = text[in.match[i].p0:in.match[i].p1];
658	for(; i<len in._n; i++)
659		in._n[i] = "";
660}
661
662macro(in: ref Inmesg, text: string): string
663{
664	word := "";
665	i := 0;
666	j := 0;
667	for(;;){
668		if(i == len text)
669			break;
670		if(text[i++] != '$')
671			continue;
672		if(i == len text)
673			break;
674		word += text[j:i-1];
675		(res, skip) := dollar(in, text[i:]);
676		word += res;
677		i += skip;
678		j = i;
679	}
680	if(j < len text)
681		word += text[j:];
682	return word;
683}
684
685dollar(in: ref Inmesg, text: string): (string, int)
686{
687	if(text[0] == '$')
688		return ("$", 1);
689	if('0'<=text[0] && text[0]<='9')
690		return (in._n[text[0]-'0'], 1);
691	if(len text < 3)
692		return ("$", 0);
693	case text[0:3] {
694	"src" =>	return (in.msg.src, 3);
695	"dst" =>	return (in.msg.dst, 3);
696	"dir" =>	return (in._dir, 3);
697	}
698	if(len text< 4)
699		return ("$", 0);
700	case text[0:4] {
701	"attr" =>	return (in.msg.attr, 4);
702	"data" =>	return (in.text, 4);
703	"file" =>	return (in._file, 4);
704	"kind" =>	return (in.msg.kind, 4);
705	}
706	return ("$", 0);
707}
708
709# compress ../ references and do other cleanups
710cleanname(name: string): string
711{
712	# compress multiple slashes
713	n := len name;
714	for(i:=0; i<n-1; i++)
715		if(name[i]=='/' && name[i+1]=='/'){
716			name = name[0:i]+name[i+1:];
717			--i;
718			n--;
719		}
720	#  eliminate ./
721	for(i=0; i<n-1; i++)
722		if(name[i]=='.' && name[i+1]=='/' && (i==0 || name[i-1]=='/')){
723			name = name[0:i]+name[i+2:];
724			--i;
725			n -= 2;
726		}
727	found: int;
728	do{
729		# compress xx/..
730		found = 0;
731		for(i=1; i<=n-3; i++)
732			if(name[i:i+3] == "/.."){
733				if(i==n-3 || name[i+3]=='/'){
734					found = 1;
735					break;
736				}
737			}
738		if(found)
739			for(j:=i-1; j>=0; --j)
740				if(j==0 || name[j-1]=='/'){
741					i += 3;		# character beyond ..
742					if(i<n && name[i]=='/')
743						++i;
744					name = name[0:j]+name[i:];
745					n -= (i-j);
746					break;
747				}
748	}while(found);
749	# eliminate trailing .
750	if(n>=2 && name[n-2]=='/' && name[n-1]=='.')
751		--n;
752	if(n == 0)
753		return ".";
754	if(n != len name)
755		name = name[0:n];
756	return name;
757}
758
759log(s: string)
760{
761	if(len s == 0)
762		return;
763	if(s[len s-1] != '\n')
764		s[len s] = '\n';
765	sys->print("plumb: %s", s);
766}
767