1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 384, 10 NMsgProcInit = 64, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 VtLock* alock; /* alloc */ 16 Msg* ahead; 17 VtRendez* arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 VtLock* rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 VtRendez* rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 VtLock* alock; /* alloc */ 37 Con* ahead; 38 VtRendez* arendez; 39 40 VtLock* clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 66 vtLock(cbox.alock); 67 if(con->cprev != nil) 68 con->cprev->cnext = con->cnext; 69 else 70 cbox.chead = con->cnext; 71 if(con->cnext != nil) 72 con->cnext->cprev = con->cprev; 73 else 74 cbox.ctail = con->cprev; 75 con->cprev = con->cnext = nil; 76 77 if(cbox.ncon > cbox.maxcon){ 78 if(con->name != nil) 79 vtMemFree(con->name); 80 vtLockFree(con->fidlock); 81 vtMemFree(con->data); 82 vtRendezFree(con->wrendez); 83 vtLockFree(con->wlock); 84 vtRendezFree(con->mrendez); 85 vtLockFree(con->mlock); 86 vtRendezFree(con->rendez); 87 vtLockFree(con->lock); 88 vtMemFree(con); 89 cbox.ncon--; 90 vtUnlock(cbox.alock); 91 return; 92 } 93 con->anext = cbox.ahead; 94 cbox.ahead = con; 95 if(con->anext == nil) 96 vtWakeup(cbox.arendez); 97 vtUnlock(cbox.alock); 98 } 99 100 static void 101 msgFree(Msg* m) 102 { 103 assert(m->rwnext == nil); 104 assert(m->flush == nil); 105 106 vtLock(mbox.alock); 107 if(mbox.nmsg > mbox.maxmsg){ 108 vtMemFree(m->data); 109 vtMemFree(m); 110 mbox.nmsg--; 111 vtUnlock(mbox.alock); 112 return; 113 } 114 m->anext = mbox.ahead; 115 mbox.ahead = m; 116 if(m->anext == nil) 117 vtWakeup(mbox.arendez); 118 vtUnlock(mbox.alock); 119 } 120 121 static Msg* 122 msgAlloc(Con* con) 123 { 124 Msg *m; 125 126 vtLock(mbox.alock); 127 while(mbox.ahead == nil){ 128 if(mbox.nmsg >= mbox.maxmsg){ 129 mbox.nmsgstarve++; 130 vtSleep(mbox.arendez); 131 continue; 132 } 133 m = vtMemAllocZ(sizeof(Msg)); 134 m->data = vtMemAlloc(mbox.msize); 135 m->msize = mbox.msize; 136 mbox.nmsg++; 137 mbox.ahead = m; 138 break; 139 } 140 m = mbox.ahead; 141 mbox.ahead = m->anext; 142 m->anext = nil; 143 vtUnlock(mbox.alock); 144 145 m->con = con; 146 m->state = MsgR; 147 m->nowq = 0; 148 149 return m; 150 } 151 152 static void 153 msgMunlink(Msg* m) 154 { 155 Con *con; 156 157 con = m->con; 158 159 if(m->mprev != nil) 160 m->mprev->mnext = m->mnext; 161 else 162 con->mhead = m->mnext; 163 if(m->mnext != nil) 164 m->mnext->mprev = m->mprev; 165 else 166 con->mtail = m->mprev; 167 m->mprev = m->mnext = nil; 168 } 169 170 void 171 msgFlush(Msg* m) 172 { 173 Con *con; 174 Msg *flush, *old; 175 176 con = m->con; 177 178 if(Dflag) 179 fprint(2, "msgFlush %F\n", &m->t); 180 181 /* 182 * If this Tflush has been flushed, nothing to do. 183 * Look for the message to be flushed in the 184 * queue of all messages still on this connection. 185 * If it's not found must assume Elvis has already 186 * left the building and reply normally. 187 */ 188 vtLock(con->mlock); 189 if(m->state == MsgF){ 190 vtUnlock(con->mlock); 191 return; 192 } 193 for(old = con->mhead; old != nil; old = old->mnext) 194 if(old->t.tag == m->t.oldtag) 195 break; 196 if(old == nil){ 197 if(Dflag) 198 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 199 vtUnlock(con->mlock); 200 return; 201 } 202 203 if(Dflag) 204 fprint(2, "\tmsgFlush found %F\n", &old->t); 205 206 /* 207 * Found it. 208 * There are two cases where the old message can be 209 * truly flushed and no reply to the original message given. 210 * The first is when the old message is in MsgR state; no 211 * processing has been done yet and it is still on the read 212 * queue. The second is if old is a Tflush, which doesn't 213 * affect the server state. In both cases, put the old 214 * message into MsgF state and let MsgWrite toss it after 215 * pulling it off the queue. 216 */ 217 if(old->state == MsgR || old->t.type == Tflush){ 218 old->state = MsgF; 219 if(Dflag) 220 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", 221 m->t.oldtag); 222 } 223 224 /* 225 * Link this flush message and the old message 226 * so multiple flushes can be coalesced (if there are 227 * multiple Tflush messages for a particular pending 228 * request, it is only necessary to respond to the last 229 * one, so any previous can be removed) and to be 230 * sure flushes wait for their corresponding old 231 * message to go out first. 232 * Waiting flush messages do not go on the write queue, 233 * they are processed after the old message is dealt 234 * with. There's no real need to protect the setting of 235 * Msg.nowq, the only code to check it runs in this 236 * process after this routine returns. 237 */ 238 if((flush = old->flush) != nil){ 239 if(Dflag) 240 fprint(2, "msgFlush: remove %d from %d list\n", 241 old->flush->t.tag, old->t.tag); 242 m->flush = flush->flush; 243 flush->flush = nil; 244 msgMunlink(flush); 245 msgFree(flush); 246 } 247 old->flush = m; 248 m->nowq = 1; 249 250 if(Dflag) 251 fprint(2, "msgFlush: add %d to %d queue\n", 252 m->t.tag, old->t.tag); 253 vtUnlock(con->mlock); 254 } 255 256 static void 257 msgProc(void*) 258 { 259 Msg *m; 260 char *e; 261 Con *con; 262 263 vtThreadSetName("msgProc"); 264 265 for(;;){ 266 /* 267 * If surplus to requirements, exit. 268 * If not, wait for and pull a message off 269 * the read queue. 270 */ 271 vtLock(mbox.rlock); 272 if(mbox.nproc > mbox.maxproc){ 273 mbox.nproc--; 274 vtUnlock(mbox.rlock); 275 break; 276 } 277 while(mbox.rhead == nil) 278 vtSleep(mbox.rrendez); 279 m = mbox.rhead; 280 mbox.rhead = m->rwnext; 281 m->rwnext = nil; 282 vtUnlock(mbox.rlock); 283 284 con = m->con; 285 e = nil; 286 287 /* 288 * If the message has been flushed before 289 * any 9P processing has started, mark it so 290 * none will be attempted. 291 */ 292 vtLock(con->mlock); 293 if(m->state == MsgF) 294 e = "flushed"; 295 else 296 m->state = Msg9; 297 vtUnlock(con->mlock); 298 299 if(e == nil){ 300 /* 301 * explain this 302 */ 303 vtLock(con->lock); 304 if(m->t.type == Tversion){ 305 con->version = m; 306 con->state = ConDown; 307 while(con->mhead != m) 308 vtSleep(con->rendez); 309 assert(con->state == ConDown); 310 if(con->version == m){ 311 con->version = nil; 312 con->state = ConInit; 313 } 314 else 315 e = "Tversion aborted"; 316 } 317 else if(con->state != ConUp) 318 e = "connection not ready"; 319 vtUnlock(con->lock); 320 } 321 322 /* 323 * Dispatch if not error already. 324 */ 325 m->r.tag = m->t.tag; 326 if(e == nil && !(*rFcall[m->t.type])(m)) 327 e = vtGetError(); 328 if(e != nil){ 329 m->r.type = Rerror; 330 m->r.ename = e; 331 } 332 else 333 m->r.type = m->t.type+1; 334 335 /* 336 * Put the message (with reply) on the 337 * write queue and wakeup the write process. 338 */ 339 if(!m->nowq){ 340 vtLock(con->wlock); 341 if(con->whead == nil) 342 con->whead = m; 343 else 344 con->wtail->rwnext = m; 345 con->wtail = m; 346 vtWakeup(con->wrendez); 347 vtUnlock(con->wlock); 348 } 349 } 350 } 351 352 static void 353 msgRead(void* v) 354 { 355 Msg *m; 356 Con *con; 357 int eof, fd, n; 358 359 vtThreadSetName("msgRead"); 360 361 con = v; 362 fd = con->fd; 363 eof = 0; 364 365 while(!eof){ 366 m = msgAlloc(con); 367 368 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 369 ; 370 if(n < 0){ 371 m->t.type = Tversion; 372 m->t.fid = NOFID; 373 m->t.tag = NOTAG; 374 m->t.msize = con->msize; 375 m->t.version = "9PEoF"; 376 eof = 1; 377 } 378 else if(convM2S(m->data, n, &m->t) != n){ 379 if(Dflag) 380 fprint(2, "msgRead: convM2S error: %s\n", 381 con->name); 382 msgFree(m); 383 continue; 384 } 385 if(Dflag) 386 fprint(2, "msgRead %p: t %F\n", con, &m->t); 387 388 vtLock(con->mlock); 389 if(con->mtail != nil){ 390 m->mprev = con->mtail; 391 con->mtail->mnext = m; 392 } 393 else{ 394 con->mhead = m; 395 m->mprev = nil; 396 } 397 con->mtail = m; 398 vtUnlock(con->mlock); 399 400 vtLock(mbox.rlock); 401 if(mbox.rhead == nil){ 402 mbox.rhead = m; 403 if(!vtWakeup(mbox.rrendez)){ 404 if(mbox.nproc < mbox.maxproc){ 405 if(vtThread(msgProc, nil) > 0) 406 mbox.nproc++; 407 } 408 else 409 mbox.nprocstarve++; 410 } 411 /* 412 * don't need this surely? 413 vtWakeup(mbox.rrendez); 414 */ 415 } 416 else 417 mbox.rtail->rwnext = m; 418 mbox.rtail = m; 419 vtUnlock(mbox.rlock); 420 } 421 } 422 423 static void 424 msgWrite(void* v) 425 { 426 Con *con; 427 int eof, n; 428 Msg *flush, *m; 429 430 vtThreadSetName("msgWrite"); 431 432 con = v; 433 if(vtThread(msgRead, con) < 0){ 434 conFree(con); 435 return; 436 } 437 438 for(;;){ 439 /* 440 * Wait for and pull a message off the write queue. 441 */ 442 vtLock(con->wlock); 443 while(con->whead == nil) 444 vtSleep(con->wrendez); 445 m = con->whead; 446 con->whead = m->rwnext; 447 m->rwnext = nil; 448 assert(!m->nowq); 449 vtUnlock(con->wlock); 450 451 eof = 0; 452 453 /* 454 * Write each message (if it hasn't been flushed) 455 * followed by any messages waiting for it to complete. 456 */ 457 vtLock(con->mlock); 458 while(m != nil){ 459 msgMunlink(m); 460 461 if(Dflag) 462 fprint(2, "msgWrite %d: r %F\n", 463 m->state, &m->r); 464 465 if(m->state != MsgF){ 466 m->state = MsgW; 467 vtUnlock(con->mlock); 468 469 n = convS2M(&m->r, con->data, con->msize); 470 if(write(con->fd, con->data, n) != n) 471 eof = 1; 472 473 vtLock(con->mlock); 474 } 475 476 if((flush = m->flush) != nil){ 477 assert(flush->nowq); 478 m->flush = nil; 479 } 480 msgFree(m); 481 m = flush; 482 } 483 vtUnlock(con->mlock); 484 485 vtLock(con->lock); 486 if(eof && con->fd >= 0){ 487 close(con->fd); 488 con->fd = -1; 489 } 490 if(con->state == ConDown) 491 vtWakeup(con->rendez); 492 if(con->state == ConMoribund && con->mhead == nil){ 493 vtUnlock(con->lock); 494 conFree(con); 495 break; 496 } 497 vtUnlock(con->lock); 498 } 499 } 500 501 Con* 502 conAlloc(int fd, char* name) 503 { 504 Con *con; 505 506 vtLock(cbox.alock); 507 while(cbox.ahead == nil){ 508 if(cbox.ncon >= cbox.maxcon){ 509 cbox.nconstarve++; 510 vtSleep(cbox.arendez); 511 continue; 512 } 513 con = vtMemAllocZ(sizeof(Con)); 514 con->lock = vtLockAlloc(); 515 con->rendez = vtRendezAlloc(con->lock); 516 con->data = vtMemAlloc(cbox.msize); 517 con->msize = cbox.msize; 518 con->alock = vtLockAlloc(); 519 con->mlock = vtLockAlloc(); 520 con->mrendez = vtRendezAlloc(con->mlock); 521 con->wlock = vtLockAlloc(); 522 con->wrendez = vtRendezAlloc(con->wlock); 523 con->fidlock = vtLockAlloc(); 524 525 cbox.ncon++; 526 cbox.ahead = con; 527 break; 528 } 529 con = cbox.ahead; 530 cbox.ahead = con->anext; 531 con->anext = nil; 532 533 if(cbox.ctail != nil){ 534 con->cprev = cbox.ctail; 535 cbox.ctail->cnext = con; 536 } 537 else{ 538 cbox.chead = con; 539 con->cprev = nil; 540 } 541 cbox.ctail = con; 542 543 assert(con->mhead == nil); 544 assert(con->whead == nil); 545 assert(con->fhead == nil); 546 assert(con->nfid == 0); 547 548 con->state = ConNew; 549 con->fd = fd; 550 if(con->name != nil){ 551 vtMemFree(con->name); 552 con->name = nil; 553 } 554 if(name != nil) 555 con->name = vtStrDup(name); 556 else 557 con->name = vtStrDup("unknown"); 558 con->aok = 0; 559 vtUnlock(cbox.alock); 560 561 if(vtThread(msgWrite, con) < 0){ 562 conFree(con); 563 return nil; 564 } 565 566 return con; 567 } 568 569 static int 570 cmdMsg(int argc, char* argv[]) 571 { 572 char *p; 573 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 574 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 575 576 maxmsg = maxproc = 0; 577 578 ARGBEGIN{ 579 default: 580 return cliError(usage); 581 case 'm': 582 p = ARGF(); 583 if(p == nil) 584 return cliError(usage); 585 maxmsg = strtol(argv[0], &p, 0); 586 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 587 return cliError(usage); 588 break; 589 case 'p': 590 p = ARGF(); 591 if(p == nil) 592 return cliError(usage); 593 maxproc = strtol(argv[0], &p, 0); 594 if(maxproc <= 0 || p == argv[0] || *p != '\0') 595 return cliError(usage); 596 break; 597 }ARGEND 598 if(argc) 599 return cliError(usage); 600 601 vtLock(mbox.alock); 602 if(maxmsg) 603 mbox.maxmsg = maxmsg; 604 maxmsg = mbox.maxmsg; 605 nmsg = mbox.nmsg; 606 nmsgstarve = mbox.nmsgstarve; 607 vtUnlock(mbox.alock); 608 609 vtLock(mbox.rlock); 610 if(maxproc) 611 mbox.maxproc = maxproc; 612 maxproc = mbox.maxproc; 613 nproc = mbox.nproc; 614 nprocstarve = mbox.nprocstarve; 615 vtUnlock(mbox.rlock); 616 617 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 618 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 619 nmsg, nmsgstarve, nproc, nprocstarve); 620 621 return 1; 622 } 623 624 static int 625 scmp(Fid *a, Fid *b) 626 { 627 if(a == 0) 628 return 1; 629 if(b == 0) 630 return -1; 631 return strcmp(a->uname, b->uname); 632 } 633 634 static Fid* 635 fidMerge(Fid *a, Fid *b) 636 { 637 Fid *s, **l; 638 639 l = &s; 640 while(a || b){ 641 if(scmp(a, b) < 0){ 642 *l = a; 643 l = &a->sort; 644 a = a->sort; 645 }else{ 646 *l = b; 647 l = &b->sort; 648 b = b->sort; 649 } 650 } 651 *l = 0; 652 return s; 653 } 654 655 static Fid* 656 fidMergeSort(Fid *f) 657 { 658 int delay; 659 Fid *a, *b; 660 661 if(f == nil) 662 return nil; 663 if(f->sort == nil) 664 return f; 665 666 a = b = f; 667 delay = 1; 668 while(a && b){ 669 if(delay) /* easy way to handle 2-element list */ 670 delay = 0; 671 else 672 a = a->sort; 673 if(b = b->sort) 674 b = b->sort; 675 } 676 677 b = a->sort; 678 a->sort = nil; 679 680 a = fidMergeSort(f); 681 b = fidMergeSort(b); 682 683 return fidMerge(a, b); 684 } 685 686 static int 687 cmdWho(int argc, char* argv[]) 688 { 689 char *usage = "usage: who"; 690 int i; 691 Con *con; 692 Fid *fid, *last; 693 694 ARGBEGIN{ 695 default: 696 return cliError(usage); 697 }ARGEND 698 699 if(argc > 0) 700 return cliError(usage); 701 702 vtRLock(cbox.clock); 703 for(con=cbox.chead; con; con=con->cnext){ 704 consPrint("\t%q:", con->name); 705 vtLock(con->fidlock); 706 last = nil; 707 for(i=0; i<NFidHash; i++) 708 for(fid=con->fidhash[i]; fid; fid=fid->hash) 709 if(fid->fidno != NOFID && fid->uname){ 710 fid->sort = last; 711 last = fid; 712 } 713 fid = fidMergeSort(last); 714 last = nil; 715 for(; fid; last=fid, fid=fid->sort) 716 if(last==nil || strcmp(fid->uname, last->uname) != 0) 717 consPrint(" %q", fid->uname); 718 vtUnlock(con->fidlock); 719 consPrint("\n"); 720 } 721 vtRUnlock(cbox.clock); 722 return 1; 723 } 724 725 void 726 msgInit(void) 727 { 728 mbox.alock = vtLockAlloc(); 729 mbox.arendez = vtRendezAlloc(mbox.alock); 730 731 mbox.rlock = vtLockAlloc(); 732 mbox.rrendez = vtRendezAlloc(mbox.rlock); 733 734 mbox.maxmsg = NMsgInit; 735 mbox.maxproc = NMsgProcInit; 736 mbox.msize = NMsizeInit; 737 738 cliAddCmd("msg", cmdMsg); 739 } 740 741 static int 742 cmdCon(int argc, char* argv[]) 743 { 744 char *p; 745 Con *con; 746 char *usage = "usage: con [-m ncon]"; 747 int maxcon, ncon, nconstarve; 748 749 maxcon = 0; 750 751 ARGBEGIN{ 752 default: 753 return cliError(usage); 754 case 'm': 755 p = ARGF(); 756 if(p == nil) 757 return cliError(usage); 758 maxcon = strtol(argv[0], &p, 0); 759 if(maxcon <= 0 || p == argv[0] || *p != '\0') 760 return cliError(usage); 761 break; 762 }ARGEND 763 if(argc) 764 return cliError(usage); 765 766 vtLock(cbox.clock); 767 if(maxcon) 768 cbox.maxcon = maxcon; 769 maxcon = cbox.maxcon; 770 ncon = cbox.ncon; 771 nconstarve = cbox.nconstarve; 772 vtUnlock(cbox.clock); 773 774 consPrint("\tcon -m %d\n", maxcon); 775 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 776 777 vtRLock(cbox.clock); 778 for(con = cbox.chead; con != nil; con = con->cnext){ 779 consPrint("\t%s\n", con->name); 780 } 781 vtRUnlock(cbox.clock); 782 783 return 1; 784 } 785 786 void 787 conInit(void) 788 { 789 cbox.alock = vtLockAlloc(); 790 cbox.arendez = vtRendezAlloc(cbox.alock); 791 792 cbox.clock = vtLockAlloc(); 793 794 cbox.maxcon = NConInit; 795 cbox.msize = NMsizeInit; 796 797 cliAddCmd("con", cmdCon); 798 cliAddCmd("who", cmdWho); 799 } 800