1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 20, 10 NMsgProcInit = 4, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 VtLock* alock; /* alloc */ 16 Msg* ahead; 17 VtRendez* arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 VtLock* rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 VtRendez* rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 VtLock* alock; /* alloc */ 37 Con* ahead; 38 VtRendez* arendez; 39 40 VtLock* clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 66 vtLock(cbox.alock); 67 if(con->cprev != nil) 68 con->cprev->cnext = con->cnext; 69 else 70 cbox.chead = con->cnext; 71 if(con->cnext != nil) 72 con->cnext->cprev = con->cprev; 73 else 74 cbox.ctail = con->cprev; 75 con->cprev = con->cnext = nil; 76 77 if(cbox.ncon > cbox.maxcon){ 78 if(con->name != nil) 79 vtMemFree(con->name); 80 vtLockFree(con->fidlock); 81 vtMemFree(con->data); 82 vtRendezFree(con->wrendez); 83 vtLockFree(con->wlock); 84 vtRendezFree(con->mrendez); 85 vtLockFree(con->mlock); 86 vtRendezFree(con->rendez); 87 vtLockFree(con->lock); 88 vtMemFree(con); 89 cbox.ncon--; 90 vtUnlock(cbox.alock); 91 return; 92 } 93 con->anext = cbox.ahead; 94 cbox.ahead = con; 95 if(con->anext == nil) 96 vtWakeup(cbox.arendez); 97 vtUnlock(cbox.alock); 98 } 99 100 static void 101 msgFree(Msg* m) 102 { 103 assert(m->rwnext == nil); 104 assert(m->fnext == nil && m->fprev == nil); 105 106 vtLock(mbox.alock); 107 if(mbox.nmsg > mbox.maxmsg){ 108 vtMemFree(m->data); 109 vtMemFree(m); 110 mbox.nmsg--; 111 vtUnlock(mbox.alock); 112 return; 113 } 114 m->anext = mbox.ahead; 115 mbox.ahead = m; 116 if(m->anext == nil) 117 vtWakeup(mbox.arendez); 118 vtUnlock(mbox.alock); 119 } 120 121 static Msg* 122 msgAlloc(Con* con) 123 { 124 Msg *m; 125 126 vtLock(mbox.alock); 127 while(mbox.ahead == nil){ 128 if(mbox.nmsg >= mbox.maxmsg){ 129 mbox.nmsgstarve++; 130 vtSleep(mbox.arendez); 131 continue; 132 } 133 m = vtMemAllocZ(sizeof(Msg)); 134 m->data = vtMemAlloc(mbox.msize); 135 m->msize = mbox.msize; 136 mbox.nmsg++; 137 mbox.ahead = m; 138 break; 139 } 140 m = mbox.ahead; 141 mbox.ahead = m->anext; 142 m->anext = nil; 143 vtUnlock(mbox.alock); 144 145 m->con = con; 146 m->state = MsgR; 147 148 return m; 149 } 150 151 static void 152 msgMunlink(Msg* m) 153 { 154 Con *con; 155 156 con = m->con; 157 158 if(m->mprev != nil) 159 m->mprev->mnext = m->mnext; 160 else 161 con->mhead = m->mnext; 162 if(m->mnext != nil) 163 m->mnext->mprev = m->mprev; 164 else 165 con->mtail = m->mprev; 166 m->mprev = m->mnext = nil; 167 } 168 169 static void 170 msgUnlinkUnlockAndFree(Msg* m) 171 { 172 /* 173 * Unlink the message from the flush and message queues, 174 * unlock the connection message lock and free the message. 175 * Called with con->mlock locked. 176 */ 177 if(m->fprev != nil) 178 m->fprev->fnext = m->fnext; 179 if(m->fnext != nil) 180 m->fnext->fprev = m->fprev; 181 m->fprev = m->fnext = nil; 182 183 msgMunlink(m); 184 vtUnlock(m->con->mlock); 185 msgFree(m); 186 } 187 188 void 189 msgFlush(Msg* m) 190 { 191 Msg *old; 192 Con *con; 193 194 con = m->con; 195 196 fprint(2, "msgFlush %F\n", &m->t); 197 198 /* 199 * Look for the message to be flushed in the 200 * queue of all messages still on this connection. 201 */ 202 vtLock(con->mlock); 203 for(old = con->mhead; old != nil; old = old->mnext) 204 if(old->t.tag == m->t.oldtag) 205 break; 206 if(old == nil){ 207 if(Dflag) 208 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 209 vtUnlock(con->mlock); 210 return; 211 } 212 213 fprint(2, "\tmsgFlush found %F\n", &old->t); 214 215 /* 216 * Found it. 217 * 218 * Easy case is no 9P processing done yet, 219 * message is on the read queue. 220 * Mark the message as flushed and let the read 221 * process throw it away after after pulling 222 * it off the read queue. 223 */ 224 if(old->state == MsgR){ 225 old->state = MsgF; 226 if(Dflag) 227 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", m->t.oldtag); 228 vtUnlock(con->mlock); 229 return; 230 } 231 232 /* 233 * Flushing flushes. 234 * Since they don't affect the server state, flushes 235 * can be deleted when in Msg9 or MsgW state. 236 */ 237 if(old->t.type == Tflush){ 238 /* 239 * For Msg9 state, the old message may 240 * or may not be on the write queue. 241 * Mark the message as flushed and let 242 * the write process throw it away after 243 * after pulling it off the write queue. 244 */ 245 if(old->state == Msg9){ 246 old->state = MsgF; 247 if(Dflag) 248 fprint(2, "msgFlush: change %d from Msg9 to MsgF\n", m->t.oldtag); 249 vtUnlock(con->mlock); 250 return; 251 } 252 assert(old->state == MsgW); 253 254 /* 255 * A flush in MsgW state implies it is waiting 256 * for its corresponding old message to be written, 257 * so it can be deleted right here, right now... 258 * right here, right now... right here, right now... 259 * right about now... the funk soul brother. 260 */ 261 if(Dflag) 262 fprint(2, "msgFlush: delete pending flush %F\n", &old->t); 263 msgUnlinkUnlockAndFree(old); 264 return; 265 } 266 267 /* 268 * Must wait for the old message to be written. 269 * Add m to old's flush queue. 270 * Old is the head of its own flush queue. 271 */ 272 m->fprev = old; 273 m->fnext = old->fnext; 274 if(m->fnext) 275 m->fnext->fprev = m; 276 old->fnext = m; 277 if(Dflag) 278 fprint(2, "msgFlush: add %d to %d queue\n", m->t.tag, old->t.tag); 279 vtUnlock(con->mlock); 280 } 281 282 static void 283 msgProc(void*) 284 { 285 Msg *m; 286 char *e; 287 Con *con; 288 289 vtThreadSetName("msgProc"); 290 291 for(;;){ 292 /* 293 * If surplus to requirements, exit. 294 * If not, wait for and pull a message off 295 * the read queue. 296 */ 297 vtLock(mbox.rlock); 298 if(mbox.nproc > mbox.maxproc){ 299 mbox.nproc--; 300 vtUnlock(mbox.rlock); 301 break; 302 } 303 while(mbox.rhead == nil) 304 vtSleep(mbox.rrendez); 305 m = mbox.rhead; 306 mbox.rhead = m->rwnext; 307 m->rwnext = nil; 308 vtUnlock(mbox.rlock); 309 310 con = m->con; 311 e = nil; 312 313 /* 314 * If the message has been flushed before any 315 * 9P processing has started, just throw it away. 316 */ 317 vtLock(con->mlock); 318 if(m->state == MsgF){ 319 msgUnlinkUnlockAndFree(m); 320 continue; 321 } 322 m->state = Msg9; 323 vtUnlock(con->mlock); 324 325 /* 326 * explain this 327 */ 328 vtLock(con->lock); 329 if(m->t.type == Tversion){ 330 con->version = m; 331 con->state = ConDown; 332 while(con->mhead != m) 333 vtSleep(con->rendez); 334 assert(con->state == ConDown); 335 if(con->version == m){ 336 con->version = nil; 337 con->state = ConInit; 338 } 339 else 340 e = "Tversion aborted"; 341 } 342 else if(con->state != ConUp) 343 e = "connection not ready"; 344 vtUnlock(con->lock); 345 346 /* 347 * Dispatch if not error already. 348 */ 349 m->r.tag = m->t.tag; 350 if(e == nil && !(*rFcall[m->t.type])(m)) 351 e = vtGetError(); 352 if(e != nil){ 353 m->r.type = Rerror; 354 m->r.ename = e; 355 } 356 else 357 m->r.type = m->t.type+1; 358 359 360 /* 361 * Put the message (with reply) on the 362 * write queue and wakeup the write process. 363 */ 364 vtLock(con->wlock); 365 if(con->whead == nil) 366 con->whead = m; 367 else 368 con->wtail->rwnext = m; 369 con->wtail = m; 370 vtWakeup(con->wrendez); 371 vtUnlock(con->wlock); 372 } 373 } 374 375 static void 376 msgRead(void* v) 377 { 378 Msg *m; 379 Con *con; 380 int eof, fd, n; 381 382 vtThreadSetName("msgRead"); 383 384 con = v; 385 fd = con->fd; 386 eof = 0; 387 388 while(!eof){ 389 m = msgAlloc(con); 390 391 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 392 ; 393 if(n < 0){ 394 m->t.type = Tversion; 395 m->t.fid = NOFID; 396 m->t.tag = NOTAG; 397 m->t.msize = con->msize; 398 m->t.version = "9PEoF"; 399 eof = 1; 400 } 401 else if(convM2S(m->data, n, &m->t) != n){ 402 if(Dflag) 403 fprint(2, "msgRead: convM2S error: %s\n", 404 con->name); 405 msgFree(m); 406 continue; 407 } 408 if(Dflag) 409 fprint(2, "msgRead %p: t %F\n", con, &m->t); 410 411 vtLock(con->mlock); 412 if(con->mtail != nil){ 413 m->mprev = con->mtail; 414 con->mtail->mnext = m; 415 } 416 else{ 417 con->mhead = m; 418 m->mprev = nil; 419 } 420 con->mtail = m; 421 vtUnlock(con->mlock); 422 423 vtLock(mbox.rlock); 424 if(mbox.rhead == nil){ 425 mbox.rhead = m; 426 if(!vtWakeup(mbox.rrendez)){ 427 if(mbox.nproc < mbox.maxproc){ 428 if(vtThread(msgProc, nil) > 0) 429 mbox.nproc++; 430 } 431 else 432 mbox.nprocstarve++; 433 } 434 /* 435 * don't need this surely? 436 vtWakeup(mbox.rrendez); 437 */ 438 } 439 else 440 mbox.rtail->rwnext = m; 441 mbox.rtail = m; 442 vtUnlock(mbox.rlock); 443 } 444 } 445 446 static int 447 _msgWrite(Msg* m) 448 { 449 Con *con; 450 int eof, n; 451 452 con = m->con; 453 454 /* 455 * An Rflush with a .fprev implies it is on a flush queue waiting for 456 * its corresponding 'oldtag' message to go out first, so punt 457 * until the 'oldtag' message goes out (see below). 458 */ 459 if(m->r.type == Rflush && m->fprev != nil){ 460 fprint(2, "msgWrite %p: delay r %F\n", con, &m->r); 461 return 0; 462 } 463 464 msgMunlink(m); 465 vtUnlock(con->mlock); 466 467 /* 468 * TODO: optimise this copy away somehow for 469 * read, stat, etc. 470 */ 471 assert(n = convS2M(&m->r, con->data, con->msize)); 472 if(write(con->fd, con->data, n) != n) 473 eof = 1; 474 else 475 eof = 0; 476 477 if(Dflag) 478 fprint(2, "msgWrite %p: r %F\n", con, &m->r); 479 480 /* 481 * Just wrote a reply. If it has any flushes waiting 482 * for it to have gone out, recurse down the list writing 483 * them out too. 484 */ 485 vtLock(con->mlock); 486 if(m->fnext != nil){ 487 m->fnext->fprev = nil; 488 eof += _msgWrite(m->fnext); 489 m->fnext = nil; 490 } 491 msgFree(m); 492 493 return eof; 494 } 495 496 static void 497 msgWrite(void* v) 498 { 499 Msg *m; 500 int eof; 501 Con *con; 502 503 vtThreadSetName("msgWrite"); 504 505 con = v; 506 if(vtThread(msgRead, con) < 0){ 507 conFree(con); 508 return; 509 } 510 511 for(;;){ 512 /* 513 * Wait for and pull a message off the write queue. 514 */ 515 vtLock(con->wlock); 516 while(con->whead == nil) 517 vtSleep(con->wrendez); 518 m = con->whead; 519 con->whead = m->rwnext; 520 m->rwnext = nil; 521 vtUnlock(con->wlock); 522 523 /* 524 * Throw the message away if it's a flushed flush, 525 * otherwise change its state and try to write it out. 526 */ 527 vtLock(con->mlock); 528 if(m->state == MsgF){ 529 assert(m->t.type == Tflush); 530 msgUnlinkUnlockAndFree(m); 531 continue; 532 } 533 m->state = MsgW; 534 eof = _msgWrite(m); 535 vtUnlock(con->mlock); 536 537 vtLock(con->lock); 538 if(eof && con->fd >= 0){ 539 close(con->fd); 540 con->fd = -1; 541 } 542 if(con->state == ConDown) 543 vtWakeup(con->rendez); 544 if(con->state == ConMoribund && con->mhead == nil){ 545 vtUnlock(con->lock); 546 conFree(con); 547 break; 548 } 549 vtUnlock(con->lock); 550 } 551 } 552 553 Con* 554 conAlloc(int fd, char* name) 555 { 556 Con *con; 557 558 vtLock(cbox.alock); 559 while(cbox.ahead == nil){ 560 if(cbox.ncon >= cbox.maxcon){ 561 cbox.nconstarve++; 562 vtSleep(cbox.arendez); 563 continue; 564 } 565 con = vtMemAllocZ(sizeof(Con)); 566 con->lock = vtLockAlloc(); 567 con->rendez = vtRendezAlloc(con->lock); 568 con->data = vtMemAlloc(cbox.msize); 569 con->msize = cbox.msize; 570 con->alock = vtLockAlloc(); 571 con->mlock = vtLockAlloc(); 572 con->mrendez = vtRendezAlloc(con->mlock); 573 con->wlock = vtLockAlloc(); 574 con->wrendez = vtRendezAlloc(con->wlock); 575 con->fidlock = vtLockAlloc(); 576 577 cbox.ncon++; 578 cbox.ahead = con; 579 break; 580 } 581 con = cbox.ahead; 582 cbox.ahead = con->anext; 583 con->anext = nil; 584 585 if(cbox.ctail != nil){ 586 con->cprev = cbox.ctail; 587 cbox.ctail->cnext = con; 588 } 589 else{ 590 cbox.chead = con; 591 con->cprev = nil; 592 } 593 cbox.ctail = con; 594 595 assert(con->mhead == nil); 596 assert(con->whead == nil); 597 assert(con->fhead == nil); 598 assert(con->nfid == 0); 599 600 con->state = ConNew; 601 con->fd = fd; 602 if(con->name != nil){ 603 vtMemFree(con->name); 604 con->name = nil; 605 } 606 if(name != nil) 607 con->name = vtStrDup(name); 608 else 609 con->name = vtStrDup("unknown"); 610 con->aok = 0; 611 vtUnlock(cbox.alock); 612 613 if(vtThread(msgWrite, con) < 0){ 614 conFree(con); 615 return nil; 616 } 617 618 return con; 619 } 620 621 static int 622 cmdMsg(int argc, char* argv[]) 623 { 624 char *p; 625 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 626 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 627 628 maxmsg = maxproc = 0; 629 630 ARGBEGIN{ 631 default: 632 return cliError(usage); 633 case 'm': 634 p = ARGF(); 635 if(p == nil) 636 return cliError(usage); 637 maxmsg = strtol(argv[0], &p, 0); 638 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 639 return cliError(usage); 640 break; 641 case 'p': 642 p = ARGF(); 643 if(p == nil) 644 return cliError(usage); 645 maxproc = strtol(argv[0], &p, 0); 646 if(maxproc <= 0 || p == argv[0] || *p != '\0') 647 return cliError(usage); 648 break; 649 }ARGEND 650 if(argc) 651 return cliError(usage); 652 653 vtLock(mbox.alock); 654 if(maxmsg) 655 mbox.maxmsg = maxmsg; 656 maxmsg = mbox.maxmsg; 657 nmsg = mbox.nmsg; 658 nmsgstarve = mbox.nmsgstarve; 659 vtUnlock(mbox.alock); 660 661 vtLock(mbox.rlock); 662 if(maxproc) 663 mbox.maxproc = maxproc; 664 maxproc = mbox.maxproc; 665 nproc = mbox.nproc; 666 nprocstarve = mbox.nprocstarve; 667 vtUnlock(mbox.rlock); 668 669 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 670 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 671 nmsg, nmsgstarve, nproc, nprocstarve); 672 673 return 1; 674 } 675 676 static int 677 scmp(Fid *a, Fid *b) 678 { 679 if(a == 0) 680 return 1; 681 if(b == 0) 682 return -1; 683 return strcmp(a->uname, b->uname); 684 } 685 686 static Fid* 687 fidMerge(Fid *a, Fid *b) 688 { 689 Fid *s, **l; 690 691 l = &s; 692 while(a || b){ 693 if(scmp(a, b) < 0){ 694 *l = a; 695 l = &a->sort; 696 a = a->sort; 697 }else{ 698 *l = b; 699 l = &b->sort; 700 b = b->sort; 701 } 702 } 703 *l = 0; 704 return s; 705 } 706 707 static Fid* 708 fidMergeSort(Fid *f) 709 { 710 int delay; 711 Fid *a, *b; 712 713 if(f == nil) 714 return nil; 715 if(f->sort == nil) 716 return f; 717 718 a = b = f; 719 delay = 1; 720 while(a && b){ 721 if(delay) /* easy way to handle 2-element list */ 722 delay = 0; 723 else 724 a = a->sort; 725 if(b = b->sort) 726 b = b->sort; 727 } 728 729 b = a->sort; 730 a->sort = nil; 731 732 a = fidMergeSort(f); 733 b = fidMergeSort(b); 734 735 return fidMerge(a, b); 736 } 737 738 static int 739 cmdWho(int argc, char* argv[]) 740 { 741 char *usage = "usage: who"; 742 int i; 743 Con *con; 744 Fid *fid, *last; 745 746 ARGBEGIN{ 747 default: 748 return cliError(usage); 749 }ARGEND 750 751 if(argc > 0) 752 return cliError(usage); 753 754 vtRLock(cbox.clock); 755 for(con=cbox.chead; con; con=con->cnext){ 756 consPrint("\t%q:", con->name); 757 vtLock(con->fidlock); 758 last = nil; 759 for(i=0; i<NFidHash; i++) 760 for(fid=con->fidhash[i]; fid; fid=fid->hash) 761 if(fid->fidno != NOFID && fid->uname){ 762 fid->sort = last; 763 last = fid; 764 } 765 fid = fidMergeSort(last); 766 last = nil; 767 for(; fid; last=fid, fid=fid->sort) 768 if(last==nil || strcmp(fid->uname, last->uname) != 0) 769 consPrint(" %q", fid->uname); 770 vtUnlock(con->fidlock); 771 consPrint("\n"); 772 } 773 vtRUnlock(cbox.clock); 774 return 1; 775 } 776 777 void 778 msgInit(void) 779 { 780 mbox.alock = vtLockAlloc(); 781 mbox.arendez = vtRendezAlloc(mbox.alock); 782 783 mbox.rlock = vtLockAlloc(); 784 mbox.rrendez = vtRendezAlloc(mbox.rlock); 785 786 mbox.maxmsg = NMsgInit; 787 mbox.maxproc = NMsgProcInit; 788 mbox.msize = NMsizeInit; 789 790 cliAddCmd("msg", cmdMsg); 791 } 792 793 static int 794 cmdCon(int argc, char* argv[]) 795 { 796 char *p; 797 Con *con; 798 char *usage = "usage: con [-m ncon]"; 799 int maxcon, ncon, nconstarve; 800 801 maxcon = 0; 802 803 ARGBEGIN{ 804 default: 805 return cliError(usage); 806 case 'm': 807 p = ARGF(); 808 if(p == nil) 809 return cliError(usage); 810 maxcon = strtol(argv[0], &p, 0); 811 if(maxcon <= 0 || p == argv[0] || *p != '\0') 812 return cliError(usage); 813 break; 814 }ARGEND 815 if(argc) 816 return cliError(usage); 817 818 vtLock(cbox.clock); 819 if(maxcon) 820 cbox.maxcon = maxcon; 821 maxcon = cbox.maxcon; 822 ncon = cbox.ncon; 823 nconstarve = cbox.nconstarve; 824 vtUnlock(cbox.clock); 825 826 consPrint("\tcon -m %d\n", maxcon); 827 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 828 829 vtRLock(cbox.clock); 830 for(con = cbox.chead; con != nil; con = con->cnext){ 831 consPrint("\t%s\n", con->name); 832 } 833 vtRUnlock(cbox.clock); 834 835 return 1; 836 } 837 838 void 839 conInit(void) 840 { 841 cbox.alock = vtLockAlloc(); 842 cbox.arendez = vtRendezAlloc(cbox.alock); 843 844 cbox.clock = vtLockAlloc(); 845 846 cbox.maxcon = NConInit; 847 cbox.msize = NMsizeInit; 848 849 cliAddCmd("con", cmdCon); 850 cliAddCmd("who", cmdWho); 851 } 852