1implement Plumber; 2 3include "sys.m"; 4 sys: Sys; 5 6include "draw.m"; 7 draw: Draw; 8 9include "sh.m"; 10 11include "regex.m"; 12 regex: Regex; 13 14include "string.m"; 15 str: String; 16 17include "../lib/plumbing.m"; 18 plumbing: Plumbing; 19 Pattern, Rule: import plumbing; 20 21include "plumbmsg.m"; 22 plumbmsg: Plumbmsg; 23 Msg, Attr: import plumbmsg; 24 25include "arg.m"; 26 27Plumber: module 28{ 29 init: fn(ctxt: ref Draw->Context, argl: list of string); 30}; 31 32Input: adt 33{ 34 inc: chan of ref Inmesg; 35 resc: chan of int; 36 io: ref Sys->FileIO; 37}; 38 39Output: adt 40{ 41 name: string; 42 outc: chan of string; 43 io: ref Sys->FileIO; 44 queue: list of array of byte; 45 started: int; 46 startup: string; 47 waiting: int; 48}; 49 50Port: adt 51{ 52 name: string; 53 startup: string; 54 alwaysstart: int; 55}; 56 57Match: adt 58{ 59 p0, p1: int; 60}; 61 62Inmesg: adt 63{ 64 msg: ref Msg; 65 text: string; # if kind is text 66 p0,p1: int; 67 match: array of Match; 68 port: int; 69 startup: string; 70 args: list of string; 71 attrs: list of ref Attr; 72 clearclick: int; 73 set: int; 74 # $ arguments 75 _n: array of string; 76 _dir: string; 77 _file: string; 78}; 79 80# Message status after processing 81HANDLED: con -1; 82UNKNOWN: con -2; 83NOTSTARTED: con -3; 84 85output: array of ref Output; 86 87input: ref Input; 88 89stderr: ref Sys->FD; 90pgrp: int; 91rules: list of ref Rule; 92titlectl: chan of string; 93ports: list of ref Port; 94wmstartup := 0; 95wmchan := "/chan/wm"; 96verbose := 0; 97 98context: ref Draw->Context; 99 100usage() 101{ 102 sys->fprint(stderr, "Usage: plumb [-vw] [-c wmchan] [initfile ...]\n"); 103 raise "fail:usage"; 104} 105 106init(ctxt: ref Draw->Context, args: list of string) 107{ 108 context = ctxt; 109 110 sys = load Sys Sys->PATH; 111 draw = load Draw Draw->PATH; 112 stderr = sys->fildes(2); 113 114 regex = load Regex Regex->PATH; 115 plumbing = load Plumbing Plumbing->PATH; 116 str = load String String->PATH; 117 118 err: string; 119 nogrp := 0; 120 121 arg := load Arg Arg->PATH; 122 arg->init(args); 123 while ((opt := arg->opt()) != 0) { 124 case opt { 125 'w' => 126 wmstartup = 1; 127 'c' => 128 if ((wmchan = arg->arg()) == nil) 129 usage(); 130 'v' => 131 verbose = 1; 132 'n' => 133 nogrp = 1; 134 * => 135 usage(); 136 } 137 } 138 args = arg->argv(); 139 arg = nil; 140 141 (rules, err) = plumbing->init(regex, args); 142 if(err != nil){ 143 sys->fprint(stderr, "plumb: %s\n", err); 144 raise "fail:init"; 145 } 146 147 plumbmsg = load Plumbmsg Plumbmsg->PATH; 148 plumbmsg->init(0, nil, 0); 149 150 if(nogrp) 151 pgrp = sys->pctl(0, nil); 152 else 153 pgrp = sys->pctl(sys->NEWPGRP, nil); 154 155 r := rules; 156 for(i:=0; i<len rules; i++){ 157 rule := hd r; 158 r = tl r; 159 for(j:=0; j<len rule.action; j++) 160 if(rule.action[j].pred == "to" || rule.action[j].pred == "alwaysstart"){ 161 p := findport(rule.action[j].arg); 162 if(p == nil){ 163 p = ref Port(rule.action[j].arg, nil, rule.action[j].pred == "alwaysstart"); 164 ports = p :: ports; 165 } 166 for(k:=0; k<len rule.action; k++) 167 if(rule.action[k].pred == "start") 168 p.startup = rule.action[k].arg; 169 break; 170 } 171 } 172 173 input = ref Input; 174 input.io = makefile("plumb.input"); 175 if(input.io == nil) 176 shutdown(); 177 input.inc = chan of ref Inmesg; 178 input.resc = chan of int; 179 spawn receiver(input); 180 181 output = array[len ports] of ref Output; 182 183 pp := ports; 184 for(i=0; i<len output; i++){ 185 p := hd pp; 186 pp = tl pp; 187 output[i] = ref Output; 188 output[i].name = p.name; 189 output[i].io = makefile("plumb."+p.name); 190 if(output[i].io == nil) 191 shutdown(); 192 output[i].outc = chan of string; 193 output[i].started = 0; 194 output[i].startup = p.startup; 195 output[i].waiting = 0; 196 } 197 198 # spawn so we return without needing to run plumb in background 199 spawn sender(input, output); 200} 201 202findport(name: string): ref Port 203{ 204 for(p:=ports; p!=nil; p=tl p) 205 if((hd p).name == name) 206 return hd p; 207 return nil; 208} 209 210makefile(file: string): ref Sys->FileIO 211{ 212 io := sys->file2chan("/chan", file); 213 if(io == nil){ 214 sys->fprint(stderr, "plumb: can't establish /chan/%s: %r\n", file); 215 return nil; 216 } 217 return io; 218} 219 220receiver(input: ref Input) 221{ 222 223 for(;;){ 224 (nil, msg, nil, wc) := <-input.io.write; 225 if(wc == nil) 226 ; # not interested in EOF; leave channel open 227 else{ 228 input.inc <-= parse(msg); 229 res := <- input.resc; 230 err := ""; 231 if(res == UNKNOWN) 232 err = "no matching plumb rule"; 233 wc <-= (len msg, err); 234 } 235 } 236} 237 238sender(input: ref Input, output: array of ref Output) 239{ 240 outputc := array[len output] of chan of (int, int, int, Sys->Rread); 241 242 for(;;){ 243 alt{ 244 in := <-input.inc => 245 if(in == nil){ 246 input.resc <-= HANDLED; 247 break; 248 } 249 (j, msg) := process(in); 250 case j { 251 HANDLED => 252 break; 253 UNKNOWN => 254 if(in.msg.src != "acme") 255 sys->fprint(stderr, "plumb: don't know who message goes to\n"); 256 NOTSTARTED => 257 sys->fprint(stderr, "plumb: can't start application\n"); 258 * => 259 output[j].queue = append(output[j].queue, msg); 260 outputc[j] = output[j].io.read; 261 } 262 input.resc <-= j; 263 264 (j, tmp) := <-outputc => 265 (nil, nbytes, nil, rc) := tmp; 266 if(rc == nil) # no interest in EOF 267 break; 268 msg := hd output[j].queue; 269 if(nbytes < len msg){ 270 rc <-= (nil, "buffer too short for message"); 271 break; 272 } 273 output[j].queue = tl output[j].queue; 274 if(output[j].queue == nil) 275 outputc[j] = nil; 276 rc <-= (msg, nil); 277 } 278 } 279} 280 281parse(a: array of byte): ref Inmesg 282{ 283 msg := Msg.unpack(a); 284 if(msg == nil) 285 return nil; 286 i := ref Inmesg; 287 i.msg = msg; 288 if(msg.dst != nil){ 289 if(control(i)) 290 return nil; 291 toport(i, msg.dst); 292 }else 293 i.port = -1; 294 i.match = array[10] of { * => Match(-1, -1)}; 295 i._n = array[10] of string; 296 i.attrs = plumbmsg->string2attrs(i.msg.attr); 297 return i; 298} 299 300append(l: list of array of byte, a: array of byte): list of array of byte 301{ 302 if(l == nil) 303 return a :: nil; 304 return hd l :: append(tl l, a); 305} 306 307shutdown() 308{ 309 fname := sys->sprint("#p/%d/ctl", pgrp); 310 if((fdesc := sys->open(fname, sys->OWRITE)) != nil) 311 sys->write(fdesc, array of byte "killgrp\n", 8); 312 raise "fail:error"; 313} 314 315# Handle control messages 316control(in: ref Inmesg): int 317{ 318 msg := in.msg; 319 if(msg.kind!="text" || msg.dst!="plumb") 320 return 0; 321 text := string msg.data; 322 case text { 323 "start" => 324 start(msg.src, 1); 325 "stop" => 326 start(msg.src, -1); 327 * => 328 sys->fprint(stderr, "plumb: unrecognized control message from %s: %s\n", msg.src, text); 329 } 330 return 1; 331} 332 333start(port: string, startstop: int) 334{ 335 for(i:=0; i<len output; i++) 336 if(port == output[i].name){ 337 output[i].waiting = 0; 338 output[i].started += startstop; 339 return; 340 } 341 sys->fprint(stderr, "plumb: \"start\" message from unrecognized port %s\n", port); 342} 343 344startup(dir, prog: string, args: list of string, wait: chan of int) 345{ 346 if(wmstartup){ 347 fd := sys->open(wmchan, Sys->OWRITE); 348 if(fd != nil){ 349 sys->fprint(fd, "s %s", str->quoted(dir :: prog :: args)); 350 wait <-= 1; 351 return; 352 } 353 } 354 355 sys->pctl(Sys->NEWFD|Sys->NEWPGRP|Sys->FORKNS, list of {0, 1, 2}); 356 wait <-= 1; 357 wait = nil; 358 mod := load Command prog; 359 if(mod == nil){ 360 sys->fprint(stderr, "plumb: can't load %s: %r\n", prog); 361 return; 362 } 363 sys->chdir(dir); 364 mod->init(context, prog :: args); 365} 366 367# See if messages should be queued while waiting for program to connect 368shouldqueue(out: ref Output): int 369{ 370 p := findport(out.name); 371 if(p == nil){ 372 sys->fprint(stderr, "plumb: can't happen in shouldqueue\n"); 373 return 0; 374 } 375 if(p.alwaysstart) 376 return 0; 377 return out.waiting; 378} 379 380# Determine destination of input message, reformat for output 381process(in: ref Inmesg): (int, array of byte) 382{ 383 if(!clarify(in)) 384 return (UNKNOWN, nil); 385 if(in.port < 0) 386 return (UNKNOWN, nil); 387 a := in.msg.pack(); 388 j := in.port; 389 if(a == nil) 390 j = UNKNOWN; 391 else if(output[j].started==0 && !shouldqueue(output[j])){ 392 path: string; 393 args: list of string; 394 if(in.startup!=nil){ 395 path = macro(in, in.startup); 396 args = expand(in, in.args); 397 }else if(output[j].startup != nil){ 398 path = output[j].startup; 399 args = in.text :: nil; 400 }else 401 return (NOTSTARTED, nil); 402 log(sys->sprint("start %s port %s\n", path, output[j].name)); 403 wait := chan of int; 404 output[j].waiting = 1; 405 spawn startup(in.msg.dir, path, args, wait); 406 <-wait; 407 return (HANDLED, nil); 408 }else{ 409 if(in.msg.kind != "text") 410 text := sys->sprint("message of type %s", in.msg.kind); 411 else{ 412 text = in.text; 413 for(i:=0; i<len text; i++){ 414 if(text[i]=='\n'){ 415 text = text[0:i]; 416 break; 417 } 418 if(i > 50) { 419 text = text[0:i]+"..."; 420 break; 421 } 422 } 423 } 424 log(sys->sprint("send \"%s\" to %s", text, output[j].name)); 425 } 426 return (j, a); 427} 428 429# expand $arguments 430expand(in: ref Inmesg, args: list of string): list of string 431{ 432 a: list of string; 433 while(args != nil){ 434 a = macro(in, hd args) :: a; 435 args = tl args; 436 } 437 while(a != nil){ 438 args = hd a :: args; 439 a = tl a; 440 } 441 return args; 442} 443 444# resolve all ambiguities, fill in any missing fields 445clarify(in: ref Inmesg): int 446{ 447 in.clearclick = 0; 448 in.set = 0; 449 msg := in.msg; 450 if(msg.kind != "text") 451 return 0; 452 in.text = string msg.data; 453 if(msg.dst != "") 454 return 1; 455 return dorules(in, rules); 456} 457 458dorules(in: ref Inmesg, rules: list of ref Rule): int 459{ 460 if (verbose) 461 log("msg: " + inmesg2s(in)); 462 for(r:=rules; r!=nil; r=tl r) { 463 if(matchrule(in, hd r)){ 464 applyrule(in, hd r); 465 if (verbose) 466 log("yes"); 467 return 1; 468 } else if (verbose) 469 log("no"); 470 } 471 return 0; 472} 473 474inmesg2s(in: ref Inmesg): string 475{ 476 m := in.msg; 477 s := sys->sprint("src=%s; dst=%s; dir=%s; kind=%s; attr='%s'", 478 m.src, m.dst, m.dir, m.kind, m.attr); 479 if (m.kind == "text") 480 s += "; data='" + string m.data + "'"; 481 return s; 482} 483 484matchrule(in: ref Inmesg, r: ref Rule): int 485{ 486 pats := r.pattern; 487 for(i:=0; i<len in.match; i++) 488 in.match[i] = (-1,-1); 489 # no rules at all implies success, so return if any fail 490 for(i=0; i<len pats; i++) 491 if(matchpattern(in, pats[i]) == 0) 492 return 0; 493 return 1; 494} 495 496applyrule(in: ref Inmesg, r: ref Rule) 497{ 498 acts := r.action; 499 for(i:=0; i<len acts; i++) 500 applypattern(in, acts[i]); 501 if(in.clearclick){ 502 al: list of ref Attr; 503 for(l:=in.attrs; l!=nil; l=tl l) 504 if((hd l).name != "click") 505 al = hd l :: al; 506 in.attrs = al; 507 in.msg.attr = plumbmsg->attrs2string(al); 508 if(in.set){ 509 in.text = macro(in, "$0"); 510 in.msg.data = array of byte in.text; 511 } 512 } 513} 514 515matchpattern(in: ref Inmesg, p: ref Pattern): int 516{ 517 msg := in.msg; 518 text: string; 519 case p.field { 520 "src" => text = msg.src; 521 "dst" => text = msg.dst; 522 "dir" => text = msg.dir; 523 "kind" => text = msg.kind; 524 "attr" => text = msg.attr; 525 "data" => text = in.text; 526 * => 527 sys->fprint(stderr, "plumb: don't recognize pattern field %s\n", p.field); 528 return 0; 529 } 530 if (verbose) 531 log(sys->sprint("'%s' %s '%s'\n", text, p.pred, p.arg)); 532 case p.pred { 533 "is" => 534 return text == p.arg; 535 "isfile" or "isdir" => 536 text = p.arg; 537 if(p.expand) 538 text = macro(in, text); 539 if(len text == 0) 540 return 0; 541 if(len in.msg.dir!=0 && text[0] != '/' && text[0]!='#') 542 text = in.msg.dir+"/"+text; 543 text = cleanname(text); 544 (ok, dir) := sys->stat(text); 545 if(ok < 0) 546 return 0; 547 if(p.pred=="isfile" && (dir.mode&Sys->DMDIR)==0){ 548 in._file = text; 549 return 1; 550 } 551 if(p.pred=="isdir" && (dir.mode&Sys->DMDIR)!=0){ 552 in._dir = text; 553 return 1; 554 } 555 return 0; 556 "matches" => 557 (clickspecified, val) := plumbmsg->lookup(in.attrs, "click"); 558 if(p.field != "data") 559 clickspecified = 0; 560 if(!clickspecified){ 561 # easy case. must match whole string 562 matches := regex->execute(p.regex, text); 563 if(matches == nil) 564 return 0; 565 (p0, p1) := matches[0]; 566 if(p0!=0 || p1!=len text) 567 return 0; 568 in.match = matches; 569 setvars(in, text); 570 return 1; 571 } 572 matches := clickmatch(p.regex, text, int val); 573 if(matches == nil) 574 return 0; 575 (p0, p1) := matches[0]; 576 # assumes all matches are in same sequence 577 if(in.match[0].p0 != -1) 578 return p0==in.match[0].p0 && p1==in.match[0].p1; 579 in.match = matches; 580 setvars(in, text); 581 in.clearclick = 1; 582 in.set = 1; 583 return 1; 584 "set" => 585 text = p.arg; 586 if(p.expand) 587 text = macro(in, text); 588 case p.field { 589 "src" => msg.src = text; 590 "dst" => msg.dst = text; 591 "dir" => msg.dir = text; 592 "kind" => msg.kind = text; 593 "attr" => msg.attr = text; 594 "data" => in.text = text; 595 msg.data = array of byte text; 596 msg.kind = "text"; 597 in.set = 0; 598 } 599 return 1; 600 * => 601 sys->fprint(stderr, "plumb: don't recognize pattern predicate %s\n", p.pred); 602 } 603 return 0; 604} 605 606applypattern(in: ref Inmesg, p: ref Pattern): int 607{ 608 if(p.field != "plumb"){ 609 sys->fprint(stderr, "plumb: don't recognize action field %s\n", p.field); 610 return 0; 611 } 612 case p.pred { 613 "to" or "alwaysstart" => 614 if(in.port >= 0) # already specified 615 return 1; 616 toport(in, p.arg); 617 "start" => 618 in.startup = p.arg; 619 in.args = p.extra; 620 * => 621 sys->fprint(stderr, "plumb: don't recognize action %s\n", p.pred); 622 } 623 return 1; 624} 625 626toport(in: ref Inmesg, name: string): int 627{ 628 for(i:=0; i<len output; i++) 629 if(name == output[i].name){ 630 in.msg.dst = name; 631 in.port = i; 632 return i; 633 } 634 in.port = -1; 635 sys->fprint(stderr, "plumb: unrecognized port %s\n", name); 636 return -1; 637} 638 639# simple heuristic: look for leftmost match that reaches click position 640clickmatch(re: ref Regex->Arena, text: string, click: int): array of Match 641{ 642 for(i:=0; i<=click && i < len text; i++){ 643 matches := regex->executese(re, text, (i, -1), i == 0, 1); 644 if(matches == nil) 645 continue; 646 (p0, p1) := matches[0]; 647 648 if(p0>=i && p1>=click) 649 return matches; 650 } 651 return nil; 652} 653 654setvars(in: ref Inmesg, text: string) 655{ 656 for(i:=0; i<len in.match && in.match[i].p0>=0; i++) 657 in._n[i] = text[in.match[i].p0:in.match[i].p1]; 658 for(; i<len in._n; i++) 659 in._n[i] = ""; 660} 661 662macro(in: ref Inmesg, text: string): string 663{ 664 word := ""; 665 i := 0; 666 j := 0; 667 for(;;){ 668 if(i == len text) 669 break; 670 if(text[i++] != '$') 671 continue; 672 if(i == len text) 673 break; 674 word += text[j:i-1]; 675 (res, skip) := dollar(in, text[i:]); 676 word += res; 677 i += skip; 678 j = i; 679 } 680 if(j < len text) 681 word += text[j:]; 682 return word; 683} 684 685dollar(in: ref Inmesg, text: string): (string, int) 686{ 687 if(text[0] == '$') 688 return ("$", 1); 689 if('0'<=text[0] && text[0]<='9') 690 return (in._n[text[0]-'0'], 1); 691 if(len text < 3) 692 return ("$", 0); 693 case text[0:3] { 694 "src" => return (in.msg.src, 3); 695 "dst" => return (in.msg.dst, 3); 696 "dir" => return (in._dir, 3); 697 } 698 if(len text< 4) 699 return ("$", 0); 700 case text[0:4] { 701 "attr" => return (in.msg.attr, 4); 702 "data" => return (in.text, 4); 703 "file" => return (in._file, 4); 704 "kind" => return (in.msg.kind, 4); 705 } 706 return ("$", 0); 707} 708 709# compress ../ references and do other cleanups 710cleanname(name: string): string 711{ 712 # compress multiple slashes 713 n := len name; 714 for(i:=0; i<n-1; i++) 715 if(name[i]=='/' && name[i+1]=='/'){ 716 name = name[0:i]+name[i+1:]; 717 --i; 718 n--; 719 } 720 # eliminate ./ 721 for(i=0; i<n-1; i++) 722 if(name[i]=='.' && name[i+1]=='/' && (i==0 || name[i-1]=='/')){ 723 name = name[0:i]+name[i+2:]; 724 --i; 725 n -= 2; 726 } 727 found: int; 728 do{ 729 # compress xx/.. 730 found = 0; 731 for(i=1; i<=n-3; i++) 732 if(name[i:i+3] == "/.."){ 733 if(i==n-3 || name[i+3]=='/'){ 734 found = 1; 735 break; 736 } 737 } 738 if(found) 739 for(j:=i-1; j>=0; --j) 740 if(j==0 || name[j-1]=='/'){ 741 i += 3; # character beyond .. 742 if(i<n && name[i]=='/') 743 ++i; 744 name = name[0:j]+name[i:]; 745 n -= (i-j); 746 break; 747 } 748 }while(found); 749 # eliminate trailing . 750 if(n>=2 && name[n-2]=='/' && name[n-1]=='.') 751 --n; 752 if(n == 0) 753 return "."; 754 if(n != len name) 755 name = name[0:n]; 756 return name; 757} 758 759log(s: string) 760{ 761 if(len s == 0) 762 return; 763 if(s[len s-1] != '\n') 764 s[len s] = '\n'; 765 sys->print("plumb: %s", s); 766} 767