1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 384, 10 NMsgProcInit = 64, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 VtLock* alock; /* alloc */ 16 Msg* ahead; 17 VtRendez* arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 VtLock* rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 VtRendez* rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 VtLock* alock; /* alloc */ 37 Con* ahead; 38 VtRendez* arendez; 39 40 VtLock* clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 con->aok = 0; 66 con->noauth = con->noperm = con->wstatallow = 0; 67 con->isconsole = 0; 68 69 vtLock(cbox.alock); 70 if(con->cprev != nil) 71 con->cprev->cnext = con->cnext; 72 else 73 cbox.chead = con->cnext; 74 if(con->cnext != nil) 75 con->cnext->cprev = con->cprev; 76 else 77 cbox.ctail = con->cprev; 78 con->cprev = con->cnext = nil; 79 80 if(cbox.ncon > cbox.maxcon){ 81 if(con->name != nil) 82 vtMemFree(con->name); 83 vtLockFree(con->fidlock); 84 vtMemFree(con->data); 85 vtRendezFree(con->wrendez); 86 vtLockFree(con->wlock); 87 vtRendezFree(con->mrendez); 88 vtLockFree(con->mlock); 89 vtRendezFree(con->rendez); 90 vtLockFree(con->lock); 91 vtMemFree(con); 92 cbox.ncon--; 93 vtUnlock(cbox.alock); 94 return; 95 } 96 con->anext = cbox.ahead; 97 cbox.ahead = con; 98 if(con->anext == nil) 99 vtWakeup(cbox.arendez); 100 vtUnlock(cbox.alock); 101 } 102 103 static void 104 msgFree(Msg* m) 105 { 106 assert(m->rwnext == nil); 107 assert(m->flush == nil); 108 109 vtLock(mbox.alock); 110 if(mbox.nmsg > mbox.maxmsg){ 111 vtMemFree(m->data); 112 vtMemFree(m); 113 mbox.nmsg--; 114 vtUnlock(mbox.alock); 115 return; 116 } 117 m->anext = mbox.ahead; 118 mbox.ahead = m; 119 if(m->anext == nil) 120 vtWakeup(mbox.arendez); 121 vtUnlock(mbox.alock); 122 } 123 124 static Msg* 125 msgAlloc(Con* con) 126 { 127 Msg *m; 128 129 vtLock(mbox.alock); 130 while(mbox.ahead == nil){ 131 if(mbox.nmsg >= mbox.maxmsg){ 132 mbox.nmsgstarve++; 133 vtSleep(mbox.arendez); 134 continue; 135 } 136 m = vtMemAllocZ(sizeof(Msg)); 137 m->data = vtMemAlloc(mbox.msize); 138 m->msize = mbox.msize; 139 mbox.nmsg++; 140 mbox.ahead = m; 141 break; 142 } 143 m = mbox.ahead; 144 mbox.ahead = m->anext; 145 m->anext = nil; 146 vtUnlock(mbox.alock); 147 148 m->con = con; 149 m->state = MsgR; 150 m->nowq = 0; 151 152 return m; 153 } 154 155 static void 156 msgMunlink(Msg* m) 157 { 158 Con *con; 159 160 con = m->con; 161 162 if(m->mprev != nil) 163 m->mprev->mnext = m->mnext; 164 else 165 con->mhead = m->mnext; 166 if(m->mnext != nil) 167 m->mnext->mprev = m->mprev; 168 else 169 con->mtail = m->mprev; 170 m->mprev = m->mnext = nil; 171 } 172 173 void 174 msgFlush(Msg* m) 175 { 176 Con *con; 177 Msg *flush, *old; 178 179 con = m->con; 180 181 if(Dflag) 182 fprint(2, "msgFlush %F\n", &m->t); 183 184 /* 185 * If this Tflush has been flushed, nothing to do. 186 * Look for the message to be flushed in the 187 * queue of all messages still on this connection. 188 * If it's not found must assume Elvis has already 189 * left the building and reply normally. 190 */ 191 vtLock(con->mlock); 192 if(m->state == MsgF){ 193 vtUnlock(con->mlock); 194 return; 195 } 196 for(old = con->mhead; old != nil; old = old->mnext) 197 if(old->t.tag == m->t.oldtag) 198 break; 199 if(old == nil){ 200 if(Dflag) 201 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 202 vtUnlock(con->mlock); 203 return; 204 } 205 206 if(Dflag) 207 fprint(2, "\tmsgFlush found %F\n", &old->t); 208 209 /* 210 * Found it. 211 * There are two cases where the old message can be 212 * truly flushed and no reply to the original message given. 213 * The first is when the old message is in MsgR state; no 214 * processing has been done yet and it is still on the read 215 * queue. The second is if old is a Tflush, which doesn't 216 * affect the server state. In both cases, put the old 217 * message into MsgF state and let MsgWrite toss it after 218 * pulling it off the queue. 219 */ 220 if(old->state == MsgR || old->t.type == Tflush){ 221 old->state = MsgF; 222 if(Dflag) 223 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", 224 m->t.oldtag); 225 } 226 227 /* 228 * Link this flush message and the old message 229 * so multiple flushes can be coalesced (if there are 230 * multiple Tflush messages for a particular pending 231 * request, it is only necessary to respond to the last 232 * one, so any previous can be removed) and to be 233 * sure flushes wait for their corresponding old 234 * message to go out first. 235 * Waiting flush messages do not go on the write queue, 236 * they are processed after the old message is dealt 237 * with. There's no real need to protect the setting of 238 * Msg.nowq, the only code to check it runs in this 239 * process after this routine returns. 240 */ 241 if((flush = old->flush) != nil){ 242 if(Dflag) 243 fprint(2, "msgFlush: remove %d from %d list\n", 244 old->flush->t.tag, old->t.tag); 245 m->flush = flush->flush; 246 flush->flush = nil; 247 msgMunlink(flush); 248 msgFree(flush); 249 } 250 old->flush = m; 251 m->nowq = 1; 252 253 if(Dflag) 254 fprint(2, "msgFlush: add %d to %d queue\n", 255 m->t.tag, old->t.tag); 256 vtUnlock(con->mlock); 257 } 258 259 static void 260 msgProc(void*) 261 { 262 Msg *m; 263 char *e; 264 Con *con; 265 266 vtThreadSetName("msgProc"); 267 268 for(;;){ 269 /* 270 * If surplus to requirements, exit. 271 * If not, wait for and pull a message off 272 * the read queue. 273 */ 274 vtLock(mbox.rlock); 275 if(mbox.nproc > mbox.maxproc){ 276 mbox.nproc--; 277 vtUnlock(mbox.rlock); 278 break; 279 } 280 while(mbox.rhead == nil) 281 vtSleep(mbox.rrendez); 282 m = mbox.rhead; 283 mbox.rhead = m->rwnext; 284 m->rwnext = nil; 285 vtUnlock(mbox.rlock); 286 287 con = m->con; 288 e = nil; 289 290 /* 291 * If the message has been flushed before 292 * any 9P processing has started, mark it so 293 * none will be attempted. 294 */ 295 vtLock(con->mlock); 296 if(m->state == MsgF) 297 e = "flushed"; 298 else 299 m->state = Msg9; 300 vtUnlock(con->mlock); 301 302 if(e == nil){ 303 /* 304 * explain this 305 */ 306 vtLock(con->lock); 307 if(m->t.type == Tversion){ 308 con->version = m; 309 con->state = ConDown; 310 while(con->mhead != m) 311 vtSleep(con->rendez); 312 assert(con->state == ConDown); 313 if(con->version == m){ 314 con->version = nil; 315 con->state = ConInit; 316 } 317 else 318 e = "Tversion aborted"; 319 } 320 else if(con->state != ConUp) 321 e = "connection not ready"; 322 vtUnlock(con->lock); 323 } 324 325 /* 326 * Dispatch if not error already. 327 */ 328 m->r.tag = m->t.tag; 329 if(e == nil && !(*rFcall[m->t.type])(m)) 330 e = vtGetError(); 331 if(e != nil){ 332 m->r.type = Rerror; 333 m->r.ename = e; 334 } 335 else 336 m->r.type = m->t.type+1; 337 338 /* 339 * Put the message (with reply) on the 340 * write queue and wakeup the write process. 341 */ 342 if(!m->nowq){ 343 vtLock(con->wlock); 344 if(con->whead == nil) 345 con->whead = m; 346 else 347 con->wtail->rwnext = m; 348 con->wtail = m; 349 vtWakeup(con->wrendez); 350 vtUnlock(con->wlock); 351 } 352 } 353 } 354 355 static void 356 msgRead(void* v) 357 { 358 Msg *m; 359 Con *con; 360 int eof, fd, n; 361 362 vtThreadSetName("msgRead"); 363 364 con = v; 365 fd = con->fd; 366 eof = 0; 367 368 while(!eof){ 369 m = msgAlloc(con); 370 371 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 372 ; 373 if(n < 0){ 374 m->t.type = Tversion; 375 m->t.fid = NOFID; 376 m->t.tag = NOTAG; 377 m->t.msize = con->msize; 378 m->t.version = "9PEoF"; 379 eof = 1; 380 } 381 else if(convM2S(m->data, n, &m->t) != n){ 382 if(Dflag) 383 fprint(2, "msgRead: convM2S error: %s\n", 384 con->name); 385 msgFree(m); 386 continue; 387 } 388 if(Dflag) 389 fprint(2, "msgRead %p: t %F\n", con, &m->t); 390 391 vtLock(con->mlock); 392 if(con->mtail != nil){ 393 m->mprev = con->mtail; 394 con->mtail->mnext = m; 395 } 396 else{ 397 con->mhead = m; 398 m->mprev = nil; 399 } 400 con->mtail = m; 401 vtUnlock(con->mlock); 402 403 vtLock(mbox.rlock); 404 if(mbox.rhead == nil){ 405 mbox.rhead = m; 406 if(!vtWakeup(mbox.rrendez)){ 407 if(mbox.nproc < mbox.maxproc){ 408 if(vtThread(msgProc, nil) > 0) 409 mbox.nproc++; 410 } 411 else 412 mbox.nprocstarve++; 413 } 414 /* 415 * don't need this surely? 416 vtWakeup(mbox.rrendez); 417 */ 418 } 419 else 420 mbox.rtail->rwnext = m; 421 mbox.rtail = m; 422 vtUnlock(mbox.rlock); 423 } 424 } 425 426 static void 427 msgWrite(void* v) 428 { 429 Con *con; 430 int eof, n; 431 Msg *flush, *m; 432 433 vtThreadSetName("msgWrite"); 434 435 con = v; 436 if(vtThread(msgRead, con) < 0){ 437 conFree(con); 438 return; 439 } 440 441 for(;;){ 442 /* 443 * Wait for and pull a message off the write queue. 444 */ 445 vtLock(con->wlock); 446 while(con->whead == nil) 447 vtSleep(con->wrendez); 448 m = con->whead; 449 con->whead = m->rwnext; 450 m->rwnext = nil; 451 assert(!m->nowq); 452 vtUnlock(con->wlock); 453 454 eof = 0; 455 456 /* 457 * Write each message (if it hasn't been flushed) 458 * followed by any messages waiting for it to complete. 459 */ 460 vtLock(con->mlock); 461 while(m != nil){ 462 msgMunlink(m); 463 464 if(Dflag) 465 fprint(2, "msgWrite %d: r %F\n", 466 m->state, &m->r); 467 468 if(m->state != MsgF){ 469 m->state = MsgW; 470 vtUnlock(con->mlock); 471 472 n = convS2M(&m->r, con->data, con->msize); 473 if(write(con->fd, con->data, n) != n) 474 eof = 1; 475 476 vtLock(con->mlock); 477 } 478 479 if((flush = m->flush) != nil){ 480 assert(flush->nowq); 481 m->flush = nil; 482 } 483 msgFree(m); 484 m = flush; 485 } 486 vtUnlock(con->mlock); 487 488 vtLock(con->lock); 489 if(eof && con->fd >= 0){ 490 close(con->fd); 491 con->fd = -1; 492 } 493 if(con->state == ConDown) 494 vtWakeup(con->rendez); 495 if(con->state == ConMoribund && con->mhead == nil){ 496 vtUnlock(con->lock); 497 conFree(con); 498 break; 499 } 500 vtUnlock(con->lock); 501 } 502 } 503 504 Con* 505 conAlloc(int fd, char* name) 506 { 507 Con *con; 508 509 vtLock(cbox.alock); 510 while(cbox.ahead == nil){ 511 if(cbox.ncon >= cbox.maxcon){ 512 cbox.nconstarve++; 513 vtSleep(cbox.arendez); 514 continue; 515 } 516 con = vtMemAllocZ(sizeof(Con)); 517 con->lock = vtLockAlloc(); 518 con->rendez = vtRendezAlloc(con->lock); 519 con->data = vtMemAlloc(cbox.msize); 520 con->msize = cbox.msize; 521 con->alock = vtLockAlloc(); 522 con->mlock = vtLockAlloc(); 523 con->mrendez = vtRendezAlloc(con->mlock); 524 con->wlock = vtLockAlloc(); 525 con->wrendez = vtRendezAlloc(con->wlock); 526 con->fidlock = vtLockAlloc(); 527 528 cbox.ncon++; 529 cbox.ahead = con; 530 break; 531 } 532 con = cbox.ahead; 533 cbox.ahead = con->anext; 534 con->anext = nil; 535 536 if(cbox.ctail != nil){ 537 con->cprev = cbox.ctail; 538 cbox.ctail->cnext = con; 539 } 540 else{ 541 cbox.chead = con; 542 con->cprev = nil; 543 } 544 cbox.ctail = con; 545 546 assert(con->mhead == nil); 547 assert(con->whead == nil); 548 assert(con->fhead == nil); 549 assert(con->nfid == 0); 550 551 con->state = ConNew; 552 con->fd = fd; 553 if(con->name != nil){ 554 vtMemFree(con->name); 555 con->name = nil; 556 } 557 if(name != nil) 558 con->name = vtStrDup(name); 559 else 560 con->name = vtStrDup("unknown"); 561 con->aok = 0; 562 con->noauth = con->noperm = con->wstatallow = 0; 563 con->isconsole = 0; 564 vtUnlock(cbox.alock); 565 566 if(vtThread(msgWrite, con) < 0){ 567 conFree(con); 568 return nil; 569 } 570 571 return con; 572 } 573 574 static int 575 cmdMsg(int argc, char* argv[]) 576 { 577 char *p; 578 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 579 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 580 581 maxmsg = maxproc = 0; 582 583 ARGBEGIN{ 584 default: 585 return cliError(usage); 586 case 'm': 587 p = ARGF(); 588 if(p == nil) 589 return cliError(usage); 590 maxmsg = strtol(argv[0], &p, 0); 591 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 592 return cliError(usage); 593 break; 594 case 'p': 595 p = ARGF(); 596 if(p == nil) 597 return cliError(usage); 598 maxproc = strtol(argv[0], &p, 0); 599 if(maxproc <= 0 || p == argv[0] || *p != '\0') 600 return cliError(usage); 601 break; 602 }ARGEND 603 if(argc) 604 return cliError(usage); 605 606 vtLock(mbox.alock); 607 if(maxmsg) 608 mbox.maxmsg = maxmsg; 609 maxmsg = mbox.maxmsg; 610 nmsg = mbox.nmsg; 611 nmsgstarve = mbox.nmsgstarve; 612 vtUnlock(mbox.alock); 613 614 vtLock(mbox.rlock); 615 if(maxproc) 616 mbox.maxproc = maxproc; 617 maxproc = mbox.maxproc; 618 nproc = mbox.nproc; 619 nprocstarve = mbox.nprocstarve; 620 vtUnlock(mbox.rlock); 621 622 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 623 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 624 nmsg, nmsgstarve, nproc, nprocstarve); 625 626 return 1; 627 } 628 629 static int 630 scmp(Fid *a, Fid *b) 631 { 632 if(a == 0) 633 return 1; 634 if(b == 0) 635 return -1; 636 return strcmp(a->uname, b->uname); 637 } 638 639 static Fid* 640 fidMerge(Fid *a, Fid *b) 641 { 642 Fid *s, **l; 643 644 l = &s; 645 while(a || b){ 646 if(scmp(a, b) < 0){ 647 *l = a; 648 l = &a->sort; 649 a = a->sort; 650 }else{ 651 *l = b; 652 l = &b->sort; 653 b = b->sort; 654 } 655 } 656 *l = 0; 657 return s; 658 } 659 660 static Fid* 661 fidMergeSort(Fid *f) 662 { 663 int delay; 664 Fid *a, *b; 665 666 if(f == nil) 667 return nil; 668 if(f->sort == nil) 669 return f; 670 671 a = b = f; 672 delay = 1; 673 while(a && b){ 674 if(delay) /* easy way to handle 2-element list */ 675 delay = 0; 676 else 677 a = a->sort; 678 if(b = b->sort) 679 b = b->sort; 680 } 681 682 b = a->sort; 683 a->sort = nil; 684 685 a = fidMergeSort(f); 686 b = fidMergeSort(b); 687 688 return fidMerge(a, b); 689 } 690 691 static int 692 cmdWho(int argc, char* argv[]) 693 { 694 char *usage = "usage: who"; 695 int i; 696 Con *con; 697 Fid *fid, *last; 698 699 ARGBEGIN{ 700 default: 701 return cliError(usage); 702 }ARGEND 703 704 if(argc > 0) 705 return cliError(usage); 706 707 vtRLock(cbox.clock); 708 for(con=cbox.chead; con; con=con->cnext){ 709 consPrint("\t%q:", con->name); 710 vtLock(con->fidlock); 711 last = nil; 712 for(i=0; i<NFidHash; i++) 713 for(fid=con->fidhash[i]; fid; fid=fid->hash) 714 if(fid->fidno != NOFID && fid->uname){ 715 fid->sort = last; 716 last = fid; 717 } 718 fid = fidMergeSort(last); 719 last = nil; 720 for(; fid; last=fid, fid=fid->sort) 721 if(last==nil || strcmp(fid->uname, last->uname) != 0) 722 consPrint(" %q", fid->uname); 723 vtUnlock(con->fidlock); 724 consPrint("\n"); 725 } 726 vtRUnlock(cbox.clock); 727 return 1; 728 } 729 730 void 731 msgInit(void) 732 { 733 mbox.alock = vtLockAlloc(); 734 mbox.arendez = vtRendezAlloc(mbox.alock); 735 736 mbox.rlock = vtLockAlloc(); 737 mbox.rrendez = vtRendezAlloc(mbox.rlock); 738 739 mbox.maxmsg = NMsgInit; 740 mbox.maxproc = NMsgProcInit; 741 mbox.msize = NMsizeInit; 742 743 cliAddCmd("msg", cmdMsg); 744 } 745 746 static int 747 cmdCon(int argc, char* argv[]) 748 { 749 char *p; 750 Con *con; 751 char *usage = "usage: con [-m ncon]"; 752 int maxcon, ncon, nconstarve; 753 754 maxcon = 0; 755 756 ARGBEGIN{ 757 default: 758 return cliError(usage); 759 case 'm': 760 p = ARGF(); 761 if(p == nil) 762 return cliError(usage); 763 maxcon = strtol(argv[0], &p, 0); 764 if(maxcon <= 0 || p == argv[0] || *p != '\0') 765 return cliError(usage); 766 break; 767 }ARGEND 768 if(argc) 769 return cliError(usage); 770 771 vtLock(cbox.clock); 772 if(maxcon) 773 cbox.maxcon = maxcon; 774 maxcon = cbox.maxcon; 775 ncon = cbox.ncon; 776 nconstarve = cbox.nconstarve; 777 vtUnlock(cbox.clock); 778 779 consPrint("\tcon -m %d\n", maxcon); 780 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 781 782 vtRLock(cbox.clock); 783 for(con = cbox.chead; con != nil; con = con->cnext){ 784 consPrint("\t%s\n", con->name); 785 } 786 vtRUnlock(cbox.clock); 787 788 return 1; 789 } 790 791 void 792 conInit(void) 793 { 794 cbox.alock = vtLockAlloc(); 795 cbox.arendez = vtRendezAlloc(cbox.alock); 796 797 cbox.clock = vtLockAlloc(); 798 799 cbox.maxcon = NConInit; 800 cbox.msize = NMsizeInit; 801 802 cliAddCmd("con", cmdCon); 803 cliAddCmd("who", cmdWho); 804 } 805