1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 384, 10 NMsgProcInit = 64, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 VtLock* alock; /* alloc */ 16 Msg* ahead; 17 VtRendez* arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 VtLock* rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 VtRendez* rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 VtLock* alock; /* alloc */ 37 Con* ahead; 38 VtRendez* arendez; 39 40 VtLock* clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 con->aok = 0; 66 con->flags = 0; 67 con->isconsole = 0; 68 69 vtLock(cbox.alock); 70 if(con->cprev != nil) 71 con->cprev->cnext = con->cnext; 72 else 73 cbox.chead = con->cnext; 74 if(con->cnext != nil) 75 con->cnext->cprev = con->cprev; 76 else 77 cbox.ctail = con->cprev; 78 con->cprev = con->cnext = nil; 79 80 if(cbox.ncon > cbox.maxcon){ 81 if(con->name != nil) 82 vtMemFree(con->name); 83 vtLockFree(con->fidlock); 84 vtMemFree(con->data); 85 vtRendezFree(con->wrendez); 86 vtLockFree(con->wlock); 87 vtRendezFree(con->mrendez); 88 vtLockFree(con->mlock); 89 vtRendezFree(con->rendez); 90 vtLockFree(con->lock); 91 vtMemFree(con); 92 cbox.ncon--; 93 vtUnlock(cbox.alock); 94 return; 95 } 96 con->anext = cbox.ahead; 97 cbox.ahead = con; 98 if(con->anext == nil) 99 vtWakeup(cbox.arendez); 100 vtUnlock(cbox.alock); 101 } 102 103 static void 104 msgFree(Msg* m) 105 { 106 assert(m->rwnext == nil); 107 assert(m->flush == nil); 108 109 vtLock(mbox.alock); 110 if(mbox.nmsg > mbox.maxmsg){ 111 vtMemFree(m->data); 112 vtMemFree(m); 113 mbox.nmsg--; 114 vtUnlock(mbox.alock); 115 return; 116 } 117 m->anext = mbox.ahead; 118 mbox.ahead = m; 119 if(m->anext == nil) 120 vtWakeup(mbox.arendez); 121 vtUnlock(mbox.alock); 122 } 123 124 static Msg* 125 msgAlloc(Con* con) 126 { 127 Msg *m; 128 129 vtLock(mbox.alock); 130 while(mbox.ahead == nil){ 131 if(mbox.nmsg >= mbox.maxmsg){ 132 mbox.nmsgstarve++; 133 vtSleep(mbox.arendez); 134 continue; 135 } 136 m = vtMemAllocZ(sizeof(Msg)); 137 m->data = vtMemAlloc(mbox.msize); 138 m->msize = mbox.msize; 139 mbox.nmsg++; 140 mbox.ahead = m; 141 break; 142 } 143 m = mbox.ahead; 144 mbox.ahead = m->anext; 145 m->anext = nil; 146 vtUnlock(mbox.alock); 147 148 m->con = con; 149 m->state = MsgR; 150 m->nowq = 0; 151 152 return m; 153 } 154 155 static void 156 msgMunlink(Msg* m) 157 { 158 Con *con; 159 160 con = m->con; 161 162 if(m->mprev != nil) 163 m->mprev->mnext = m->mnext; 164 else 165 con->mhead = m->mnext; 166 if(m->mnext != nil) 167 m->mnext->mprev = m->mprev; 168 else 169 con->mtail = m->mprev; 170 m->mprev = m->mnext = nil; 171 } 172 173 void 174 msgFlush(Msg* m) 175 { 176 Con *con; 177 Msg *flush, *old; 178 179 con = m->con; 180 181 if(Dflag) 182 fprint(2, "msgFlush %F\n", &m->t); 183 184 /* 185 * If this Tflush has been flushed, nothing to do. 186 * Look for the message to be flushed in the 187 * queue of all messages still on this connection. 188 * If it's not found must assume Elvis has already 189 * left the building and reply normally. 190 */ 191 vtLock(con->mlock); 192 if(m->state == MsgF){ 193 vtUnlock(con->mlock); 194 return; 195 } 196 for(old = con->mhead; old != nil; old = old->mnext) 197 if(old->t.tag == m->t.oldtag) 198 break; 199 if(old == nil){ 200 if(Dflag) 201 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 202 vtUnlock(con->mlock); 203 return; 204 } 205 206 if(Dflag) 207 fprint(2, "\tmsgFlush found %F\n", &old->t); 208 209 /* 210 * Found it. 211 * There are two cases where the old message can be 212 * truly flushed and no reply to the original message given. 213 * The first is when the old message is in MsgR state; no 214 * processing has been done yet and it is still on the read 215 * queue. The second is if old is a Tflush, which doesn't 216 * affect the server state. In both cases, put the old 217 * message into MsgF state and let MsgWrite toss it after 218 * pulling it off the queue. 219 */ 220 if(old->state == MsgR || old->t.type == Tflush){ 221 old->state = MsgF; 222 if(Dflag) 223 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", 224 m->t.oldtag); 225 } 226 227 /* 228 * Link this flush message and the old message 229 * so multiple flushes can be coalesced (if there are 230 * multiple Tflush messages for a particular pending 231 * request, it is only necessary to respond to the last 232 * one, so any previous can be removed) and to be 233 * sure flushes wait for their corresponding old 234 * message to go out first. 235 * Waiting flush messages do not go on the write queue, 236 * they are processed after the old message is dealt 237 * with. There's no real need to protect the setting of 238 * Msg.nowq, the only code to check it runs in this 239 * process after this routine returns. 240 */ 241 if((flush = old->flush) != nil){ 242 if(Dflag) 243 fprint(2, "msgFlush: remove %d from %d list\n", 244 old->flush->t.tag, old->t.tag); 245 m->flush = flush->flush; 246 flush->flush = nil; 247 msgMunlink(flush); 248 msgFree(flush); 249 } 250 old->flush = m; 251 m->nowq = 1; 252 253 if(Dflag) 254 fprint(2, "msgFlush: add %d to %d queue\n", 255 m->t.tag, old->t.tag); 256 vtUnlock(con->mlock); 257 } 258 259 static void 260 msgProc(void*) 261 { 262 Msg *m; 263 char *e; 264 Con *con; 265 266 vtThreadSetName("msgProc"); 267 268 for(;;){ 269 /* 270 * If surplus to requirements, exit. 271 * If not, wait for and pull a message off 272 * the read queue. 273 */ 274 vtLock(mbox.rlock); 275 if(mbox.nproc > mbox.maxproc){ 276 mbox.nproc--; 277 vtUnlock(mbox.rlock); 278 break; 279 } 280 while(mbox.rhead == nil) 281 vtSleep(mbox.rrendez); 282 m = mbox.rhead; 283 mbox.rhead = m->rwnext; 284 m->rwnext = nil; 285 vtUnlock(mbox.rlock); 286 287 con = m->con; 288 e = nil; 289 290 /* 291 * If the message has been flushed before 292 * any 9P processing has started, mark it so 293 * none will be attempted. 294 */ 295 vtLock(con->mlock); 296 if(m->state == MsgF) 297 e = "flushed"; 298 else 299 m->state = Msg9; 300 vtUnlock(con->mlock); 301 302 if(e == nil){ 303 /* 304 * explain this 305 */ 306 vtLock(con->lock); 307 if(m->t.type == Tversion){ 308 con->version = m; 309 con->state = ConDown; 310 while(con->mhead != m) 311 vtSleep(con->rendez); 312 assert(con->state == ConDown); 313 if(con->version == m){ 314 con->version = nil; 315 con->state = ConInit; 316 } 317 else 318 e = "Tversion aborted"; 319 } 320 else if(con->state != ConUp) 321 e = "connection not ready"; 322 vtUnlock(con->lock); 323 } 324 325 /* 326 * Dispatch if not error already. 327 */ 328 m->r.tag = m->t.tag; 329 if(e == nil && !(*rFcall[m->t.type])(m)) 330 e = vtGetError(); 331 if(e != nil){ 332 m->r.type = Rerror; 333 m->r.ename = e; 334 } 335 else 336 m->r.type = m->t.type+1; 337 338 /* 339 * Put the message (with reply) on the 340 * write queue and wakeup the write process. 341 */ 342 if(!m->nowq){ 343 vtLock(con->wlock); 344 if(con->whead == nil) 345 con->whead = m; 346 else 347 con->wtail->rwnext = m; 348 con->wtail = m; 349 vtWakeup(con->wrendez); 350 vtUnlock(con->wlock); 351 } 352 } 353 } 354 355 static void 356 msgRead(void* v) 357 { 358 Msg *m; 359 Con *con; 360 int eof, fd, n; 361 362 vtThreadSetName("msgRead"); 363 364 con = v; 365 fd = con->fd; 366 eof = 0; 367 368 while(!eof){ 369 m = msgAlloc(con); 370 371 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 372 ; 373 if(n < 0){ 374 m->t.type = Tversion; 375 m->t.fid = NOFID; 376 m->t.tag = NOTAG; 377 m->t.msize = con->msize; 378 m->t.version = "9PEoF"; 379 eof = 1; 380 } 381 else if(convM2S(m->data, n, &m->t) != n){ 382 if(Dflag) 383 fprint(2, "msgRead: convM2S error: %s\n", 384 con->name); 385 msgFree(m); 386 continue; 387 } 388 if(Dflag) 389 fprint(2, "msgRead %p: t %F\n", con, &m->t); 390 391 vtLock(con->mlock); 392 if(con->mtail != nil){ 393 m->mprev = con->mtail; 394 con->mtail->mnext = m; 395 } 396 else{ 397 con->mhead = m; 398 m->mprev = nil; 399 } 400 con->mtail = m; 401 vtUnlock(con->mlock); 402 403 vtLock(mbox.rlock); 404 if(mbox.rhead == nil){ 405 mbox.rhead = m; 406 if(!vtWakeup(mbox.rrendez)){ 407 if(mbox.nproc < mbox.maxproc){ 408 if(vtThread(msgProc, nil) > 0) 409 mbox.nproc++; 410 } 411 else 412 mbox.nprocstarve++; 413 } 414 /* 415 * don't need this surely? 416 vtWakeup(mbox.rrendez); 417 */ 418 } 419 else 420 mbox.rtail->rwnext = m; 421 mbox.rtail = m; 422 vtUnlock(mbox.rlock); 423 } 424 } 425 426 static void 427 msgWrite(void* v) 428 { 429 Con *con; 430 int eof, n; 431 Msg *flush, *m; 432 433 vtThreadSetName("msgWrite"); 434 435 con = v; 436 if(vtThread(msgRead, con) < 0){ 437 conFree(con); 438 return; 439 } 440 441 for(;;){ 442 /* 443 * Wait for and pull a message off the write queue. 444 */ 445 vtLock(con->wlock); 446 while(con->whead == nil) 447 vtSleep(con->wrendez); 448 m = con->whead; 449 con->whead = m->rwnext; 450 m->rwnext = nil; 451 assert(!m->nowq); 452 vtUnlock(con->wlock); 453 454 eof = 0; 455 456 /* 457 * Write each message (if it hasn't been flushed) 458 * followed by any messages waiting for it to complete. 459 */ 460 vtLock(con->mlock); 461 while(m != nil){ 462 msgMunlink(m); 463 464 if(Dflag) 465 fprint(2, "msgWrite %d: r %F\n", 466 m->state, &m->r); 467 468 if(m->state != MsgF){ 469 m->state = MsgW; 470 vtUnlock(con->mlock); 471 472 n = convS2M(&m->r, con->data, con->msize); 473 if(write(con->fd, con->data, n) != n) 474 eof = 1; 475 476 vtLock(con->mlock); 477 } 478 479 if((flush = m->flush) != nil){ 480 assert(flush->nowq); 481 m->flush = nil; 482 } 483 msgFree(m); 484 m = flush; 485 } 486 vtUnlock(con->mlock); 487 488 vtLock(con->lock); 489 if(eof && con->fd >= 0){ 490 close(con->fd); 491 con->fd = -1; 492 } 493 if(con->state == ConDown) 494 vtWakeup(con->rendez); 495 if(con->state == ConMoribund && con->mhead == nil){ 496 vtUnlock(con->lock); 497 conFree(con); 498 break; 499 } 500 vtUnlock(con->lock); 501 } 502 } 503 504 Con* 505 conAlloc(int fd, char* name, int flags) 506 { 507 Con *con; 508 char buf[128], *p; 509 int rfd, n; 510 511 vtLock(cbox.alock); 512 while(cbox.ahead == nil){ 513 if(cbox.ncon >= cbox.maxcon){ 514 cbox.nconstarve++; 515 vtSleep(cbox.arendez); 516 continue; 517 } 518 con = vtMemAllocZ(sizeof(Con)); 519 con->lock = vtLockAlloc(); 520 con->rendez = vtRendezAlloc(con->lock); 521 con->data = vtMemAlloc(cbox.msize); 522 con->msize = cbox.msize; 523 con->alock = vtLockAlloc(); 524 con->mlock = vtLockAlloc(); 525 con->mrendez = vtRendezAlloc(con->mlock); 526 con->wlock = vtLockAlloc(); 527 con->wrendez = vtRendezAlloc(con->wlock); 528 con->fidlock = vtLockAlloc(); 529 530 cbox.ncon++; 531 cbox.ahead = con; 532 break; 533 } 534 con = cbox.ahead; 535 cbox.ahead = con->anext; 536 con->anext = nil; 537 538 if(cbox.ctail != nil){ 539 con->cprev = cbox.ctail; 540 cbox.ctail->cnext = con; 541 } 542 else{ 543 cbox.chead = con; 544 con->cprev = nil; 545 } 546 cbox.ctail = con; 547 548 assert(con->mhead == nil); 549 assert(con->whead == nil); 550 assert(con->fhead == nil); 551 assert(con->nfid == 0); 552 553 con->state = ConNew; 554 con->fd = fd; 555 if(con->name != nil){ 556 vtMemFree(con->name); 557 con->name = nil; 558 } 559 if(name != nil) 560 con->name = vtStrDup(name); 561 else 562 con->name = vtStrDup("unknown"); 563 con->remote[0] = 0; 564 snprint(buf, sizeof buf, "%s/remote", con->name); 565 if((rfd = open(buf, OREAD)) >= 0){ 566 n = read(rfd, buf, sizeof buf-1); 567 close(rfd); 568 if(n > 0){ 569 buf[n] = 0; 570 if((p = strchr(buf, '\n')) != nil) 571 *p = 0; 572 strecpy(con->remote, con->remote+sizeof con->remote, buf); 573 } 574 } 575 con->flags = flags; 576 con->isconsole = 0; 577 vtUnlock(cbox.alock); 578 579 if(vtThread(msgWrite, con) < 0){ 580 conFree(con); 581 return nil; 582 } 583 584 return con; 585 } 586 587 static int 588 cmdMsg(int argc, char* argv[]) 589 { 590 char *p; 591 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 592 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 593 594 maxmsg = maxproc = 0; 595 596 ARGBEGIN{ 597 default: 598 return cliError(usage); 599 case 'm': 600 p = ARGF(); 601 if(p == nil) 602 return cliError(usage); 603 maxmsg = strtol(argv[0], &p, 0); 604 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 605 return cliError(usage); 606 break; 607 case 'p': 608 p = ARGF(); 609 if(p == nil) 610 return cliError(usage); 611 maxproc = strtol(argv[0], &p, 0); 612 if(maxproc <= 0 || p == argv[0] || *p != '\0') 613 return cliError(usage); 614 break; 615 }ARGEND 616 if(argc) 617 return cliError(usage); 618 619 vtLock(mbox.alock); 620 if(maxmsg) 621 mbox.maxmsg = maxmsg; 622 maxmsg = mbox.maxmsg; 623 nmsg = mbox.nmsg; 624 nmsgstarve = mbox.nmsgstarve; 625 vtUnlock(mbox.alock); 626 627 vtLock(mbox.rlock); 628 if(maxproc) 629 mbox.maxproc = maxproc; 630 maxproc = mbox.maxproc; 631 nproc = mbox.nproc; 632 nprocstarve = mbox.nprocstarve; 633 vtUnlock(mbox.rlock); 634 635 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 636 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 637 nmsg, nmsgstarve, nproc, nprocstarve); 638 639 return 1; 640 } 641 642 static int 643 scmp(Fid *a, Fid *b) 644 { 645 if(a == 0) 646 return 1; 647 if(b == 0) 648 return -1; 649 return strcmp(a->uname, b->uname); 650 } 651 652 static Fid* 653 fidMerge(Fid *a, Fid *b) 654 { 655 Fid *s, **l; 656 657 l = &s; 658 while(a || b){ 659 if(scmp(a, b) < 0){ 660 *l = a; 661 l = &a->sort; 662 a = a->sort; 663 }else{ 664 *l = b; 665 l = &b->sort; 666 b = b->sort; 667 } 668 } 669 *l = 0; 670 return s; 671 } 672 673 static Fid* 674 fidMergeSort(Fid *f) 675 { 676 int delay; 677 Fid *a, *b; 678 679 if(f == nil) 680 return nil; 681 if(f->sort == nil) 682 return f; 683 684 a = b = f; 685 delay = 1; 686 while(a && b){ 687 if(delay) /* easy way to handle 2-element list */ 688 delay = 0; 689 else 690 a = a->sort; 691 if(b = b->sort) 692 b = b->sort; 693 } 694 695 b = a->sort; 696 a->sort = nil; 697 698 a = fidMergeSort(f); 699 b = fidMergeSort(b); 700 701 return fidMerge(a, b); 702 } 703 704 static int 705 cmdWho(int argc, char* argv[]) 706 { 707 char *usage = "usage: who"; 708 int i, l1, l2, l; 709 Con *con; 710 Fid *fid, *last; 711 712 ARGBEGIN{ 713 default: 714 return cliError(usage); 715 }ARGEND 716 717 if(argc > 0) 718 return cliError(usage); 719 720 vtRLock(cbox.clock); 721 l1 = 0; 722 l2 = 0; 723 for(con=cbox.chead; con; con=con->cnext){ 724 if((l = strlen(con->name)) > l1) 725 l1 = l; 726 if((l = strlen(con->remote)) > l2) 727 l2 = l; 728 } 729 for(con=cbox.chead; con; con=con->cnext){ 730 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote); 731 vtLock(con->fidlock); 732 last = nil; 733 for(i=0; i<NFidHash; i++) 734 for(fid=con->fidhash[i]; fid; fid=fid->hash) 735 if(fid->fidno != NOFID && fid->uname){ 736 fid->sort = last; 737 last = fid; 738 } 739 fid = fidMergeSort(last); 740 last = nil; 741 for(; fid; last=fid, fid=fid->sort) 742 if(last==nil || strcmp(fid->uname, last->uname) != 0) 743 consPrint(" %q", fid->uname); 744 vtUnlock(con->fidlock); 745 consPrint("\n"); 746 } 747 vtRUnlock(cbox.clock); 748 return 1; 749 } 750 751 void 752 msgInit(void) 753 { 754 mbox.alock = vtLockAlloc(); 755 mbox.arendez = vtRendezAlloc(mbox.alock); 756 757 mbox.rlock = vtLockAlloc(); 758 mbox.rrendez = vtRendezAlloc(mbox.rlock); 759 760 mbox.maxmsg = NMsgInit; 761 mbox.maxproc = NMsgProcInit; 762 mbox.msize = NMsizeInit; 763 764 cliAddCmd("msg", cmdMsg); 765 } 766 767 static int 768 cmdCon(int argc, char* argv[]) 769 { 770 char *p; 771 Con *con; 772 char *usage = "usage: con [-m ncon]"; 773 int maxcon, ncon, nconstarve; 774 775 maxcon = 0; 776 777 ARGBEGIN{ 778 default: 779 return cliError(usage); 780 case 'm': 781 p = ARGF(); 782 if(p == nil) 783 return cliError(usage); 784 maxcon = strtol(argv[0], &p, 0); 785 if(maxcon <= 0 || p == argv[0] || *p != '\0') 786 return cliError(usage); 787 break; 788 }ARGEND 789 if(argc) 790 return cliError(usage); 791 792 vtLock(cbox.clock); 793 if(maxcon) 794 cbox.maxcon = maxcon; 795 maxcon = cbox.maxcon; 796 ncon = cbox.ncon; 797 nconstarve = cbox.nconstarve; 798 vtUnlock(cbox.clock); 799 800 consPrint("\tcon -m %d\n", maxcon); 801 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 802 803 vtRLock(cbox.clock); 804 for(con = cbox.chead; con != nil; con = con->cnext){ 805 consPrint("\t%s\n", con->name); 806 } 807 vtRUnlock(cbox.clock); 808 809 return 1; 810 } 811 812 void 813 conInit(void) 814 { 815 cbox.alock = vtLockAlloc(); 816 cbox.arendez = vtRendezAlloc(cbox.alock); 817 818 cbox.clock = vtLockAlloc(); 819 820 cbox.maxcon = NConInit; 821 cbox.msize = NMsizeInit; 822 823 cliAddCmd("con", cmdCon); 824 cliAddCmd("who", cmdWho); 825 } 826