1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 20, 10 NMsgProcInit = 4, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 VtLock* alock; /* alloc */ 16 Msg* ahead; 17 VtRendez* arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 VtLock* rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 VtRendez* rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 VtLock* alock; /* alloc */ 37 Con* ahead; 38 VtRendez* arendez; 39 40 VtLock* clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 66 vtLock(cbox.alock); 67 if(con->cprev != nil) 68 con->cprev->cnext = con->cnext; 69 else 70 cbox.chead = con->cnext; 71 if(con->cnext != nil) 72 con->cnext->cprev = con->cprev; 73 else 74 cbox.ctail = con->cprev; 75 con->cprev = con->cnext = nil; 76 77 if(cbox.ncon > cbox.maxcon){ 78 if(con->name != nil) 79 vtMemFree(con->name); 80 vtLockFree(con->fidlock); 81 vtMemFree(con->data); 82 vtRendezAlloc(con->wrendez); 83 vtLockFree(con->wlock); 84 vtRendezAlloc(con->mrendez); 85 vtLockFree(con->mlock); 86 vtRendezAlloc(con->rendez); 87 vtLockFree(con->lock); 88 vtMemFree(con); 89 cbox.ncon--; 90 vtUnlock(cbox.alock); 91 return; 92 } 93 con->anext = cbox.ahead; 94 cbox.ahead = con; 95 if(con->anext == nil) 96 vtWakeup(cbox.arendez); 97 vtUnlock(cbox.alock); 98 } 99 100 static void 101 msgFree(Msg* m) 102 { 103 assert(m->rwnext == nil); 104 assert(m->fnext == nil && m->fprev == nil); 105 106 vtLock(mbox.alock); 107 if(mbox.nmsg > mbox.maxmsg){ 108 vtMemFree(m->data); 109 vtMemFree(m); 110 mbox.nmsg--; 111 vtUnlock(mbox.alock); 112 return; 113 } 114 m->anext = mbox.ahead; 115 mbox.ahead = m; 116 if(m->anext == nil) 117 vtWakeup(mbox.arendez); 118 vtUnlock(mbox.alock); 119 } 120 121 static Msg* 122 msgAlloc(Con* con) 123 { 124 Msg *m; 125 126 vtLock(mbox.alock); 127 while(mbox.ahead == nil){ 128 if(mbox.nmsg >= mbox.maxmsg){ 129 mbox.nmsgstarve++; 130 vtSleep(mbox.arendez); 131 continue; 132 } 133 m = vtMemAllocZ(sizeof(Msg)); 134 m->data = vtMemAlloc(mbox.msize); 135 m->msize = mbox.msize; 136 mbox.nmsg++; 137 mbox.ahead = m; 138 break; 139 } 140 m = mbox.ahead; 141 mbox.ahead = m->anext; 142 m->anext = nil; 143 vtUnlock(mbox.alock); 144 145 m->con = con; 146 m->state = MsgR; 147 148 return m; 149 } 150 151 static void 152 msgMunlink(Msg* m) 153 { 154 Con *con; 155 156 con = m->con; 157 158 if(m->mprev != nil) 159 m->mprev->mnext = m->mnext; 160 else 161 con->mhead = m->mnext; 162 if(m->mnext != nil) 163 m->mnext->mprev = m->mprev; 164 else 165 con->mtail = m->mprev; 166 m->mprev = m->mnext = nil; 167 } 168 169 static void 170 msgUnlinkUnlockAndFree(Msg* m) 171 { 172 /* 173 * Unlink the message from the flush and message queues, 174 * unlock the connection message lock and free the message. 175 * Called with con->mlock locked. 176 */ 177 if(m->fprev != nil) 178 m->fprev->fnext = m->fnext; 179 if(m->fnext != nil) 180 m->fnext->fprev = m->fprev; 181 m->fprev = m->fnext = nil; 182 183 msgMunlink(m); 184 vtUnlock(m->con->mlock); 185 msgFree(m); 186 } 187 188 void 189 msgFlush(Msg* m) 190 { 191 Msg *old; 192 Con *con; 193 194 con = m->con; 195 196 /* 197 * Look for the message to be flushed in the 198 * queue of all messages still on this connection. 199 */ 200 vtLock(con->mlock); 201 for(old = con->mhead; old != nil; old = old->mnext) 202 if(old->t.tag == m->t.oldtag) 203 break; 204 if(old == nil){ 205 if(Dflag) 206 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 207 vtUnlock(con->mlock); 208 return; 209 } 210 211 /* 212 * Found it. 213 * 214 * Easy case is no 9P processing done yet, 215 * message is on the read queue. 216 * Mark the message as flushed and let the read 217 * process throw it away after after pulling 218 * it off the read queue. 219 */ 220 if(old->state == MsgR){ 221 old->state = MsgF; 222 if(Dflag) 223 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", m->t.oldtag); 224 vtUnlock(con->mlock); 225 return; 226 } 227 228 /* 229 * Flushing flushes. 230 * Since they don't affect the server state, flushes 231 * can be deleted when in Msg9 or MsgW state. 232 */ 233 if(old->t.type == Tflush){ 234 /* 235 * For Msg9 state, the old message may 236 * or may not be on the write queue. 237 * Mark the message as flushed and let 238 * the write process throw it away after 239 * after pulling it off the write queue. 240 */ 241 if(old->state == Msg9){ 242 old->state = MsgF; 243 if(Dflag) 244 fprint(2, "msgFlush: change %d from Msg9 to MsgF\n", m->t.oldtag); 245 vtUnlock(con->mlock); 246 return; 247 } 248 assert(old->state == MsgW); 249 250 /* 251 * A flush in MsgW state implies it is waiting 252 * for its corresponding old message to be written, 253 * so it can be deleted right here, right now... 254 * right here, right now... right here, right now... 255 * right about now... the funk soul brother. 256 */ 257 if(Dflag) 258 fprint(2, "msgFlush: delete pending flush %F\n", &old->t); 259 msgUnlinkUnlockAndFree(old); 260 return; 261 } 262 263 /* 264 * Must wait for the old message to be written. 265 * Add m to old's flush queue. 266 * Old is the head of its own flush queue. 267 */ 268 m->fprev = old; 269 m->fnext = old->fnext; 270 if(m->fnext) 271 m->fnext->fprev = m; 272 old->fnext = m; 273 if(Dflag) 274 fprint(2, "msgFlush: add %d to %d queue\n", m->t.tag, old->t.tag); 275 vtUnlock(con->mlock); 276 } 277 278 static void 279 msgProc(void*) 280 { 281 Msg *m; 282 char *e; 283 Con *con; 284 285 vtThreadSetName("msgProc"); 286 287 for(;;){ 288 /* 289 * If surplus to requirements, exit. 290 * If not, wait for and pull a message off 291 * the read queue. 292 */ 293 vtLock(mbox.rlock); 294 if(mbox.nproc > mbox.maxproc){ 295 mbox.nproc--; 296 vtUnlock(mbox.rlock); 297 break; 298 } 299 while(mbox.rhead == nil) 300 vtSleep(mbox.rrendez); 301 m = mbox.rhead; 302 mbox.rhead = m->rwnext; 303 m->rwnext = nil; 304 vtUnlock(mbox.rlock); 305 306 con = m->con; 307 e = nil; 308 309 /* 310 * If the message has been flushed before any 311 * 9P processing has started, just throw it away. 312 */ 313 vtLock(con->mlock); 314 if(m->state == MsgF){ 315 msgUnlinkUnlockAndFree(m); 316 continue; 317 } 318 m->state = Msg9; 319 vtUnlock(con->mlock); 320 321 /* 322 * explain this 323 */ 324 vtLock(con->lock); 325 if(m->t.type == Tversion){ 326 con->version = m; 327 con->state = ConDown; 328 while(con->mhead != m) 329 vtSleep(con->rendez); 330 assert(con->state == ConDown); 331 if(con->version == m){ 332 con->version = nil; 333 con->state = ConInit; 334 } 335 else 336 e = "Tversion aborted"; 337 } 338 else if(con->state != ConUp) 339 e = "connection not ready"; 340 vtUnlock(con->lock); 341 342 /* 343 * Dispatch if not error already. 344 */ 345 m->r.tag = m->t.tag; 346 if(e == nil && !(*rFcall[m->t.type])(m)) 347 e = vtGetError(); 348 if(e != nil){ 349 m->r.type = Rerror; 350 m->r.ename = e; 351 } 352 else 353 m->r.type = m->t.type+1; 354 355 356 /* 357 * Put the message (with reply) on the 358 * write queue and wakeup the write process. 359 */ 360 vtLock(con->wlock); 361 if(con->whead == nil) 362 con->whead = m; 363 else 364 con->wtail->rwnext = m; 365 con->wtail = m; 366 vtWakeup(con->wrendez); 367 vtUnlock(con->wlock); 368 } 369 } 370 371 static void 372 msgRead(void* v) 373 { 374 Msg *m; 375 Con *con; 376 int eof, fd, n; 377 378 vtThreadSetName("msgRead"); 379 380 con = v; 381 fd = con->fd; 382 eof = 0; 383 384 while(!eof){ 385 m = msgAlloc(con); 386 387 while((n = read9pmsg(fd, m->data, con->msize)) == 0) 388 ; 389 if(n < 0){ 390 m->t.type = Tversion; 391 m->t.fid = NOFID; 392 m->t.tag = NOTAG; 393 m->t.msize = con->msize; 394 m->t.version = "9PEoF"; 395 eof = 1; 396 } 397 else if(convM2S(m->data, n, &m->t) != n){ 398 if(Dflag) 399 fprint(2, "msgRead: convM2S error: %s\n", 400 con->name); 401 msgFree(m); 402 continue; 403 } 404 if(Dflag) 405 fprint(2, "msgRead: t %F\n", &m->t); 406 407 vtLock(con->mlock); 408 if(con->mtail != nil){ 409 m->mprev = con->mtail; 410 con->mtail->mnext = m; 411 } 412 else{ 413 con->mhead = m; 414 m->mprev = nil; 415 } 416 con->mtail = m; 417 vtUnlock(con->mlock); 418 419 vtLock(mbox.rlock); 420 if(mbox.rhead == nil){ 421 mbox.rhead = m; 422 if(!vtWakeup(mbox.rrendez)){ 423 if(mbox.nproc < mbox.maxproc){ 424 if(vtThread(msgProc, nil) > 0) 425 mbox.nproc++; 426 } 427 else 428 mbox.nprocstarve++; 429 } 430 /* 431 * don't need this surely? 432 vtWakeup(mbox.rrendez); 433 */ 434 } 435 else 436 mbox.rtail->rwnext = m; 437 mbox.rtail = m; 438 vtUnlock(mbox.rlock); 439 } 440 } 441 442 static int 443 _msgWrite(Msg* m) 444 { 445 Con *con; 446 int eof, n; 447 448 con = m->con; 449 450 /* 451 * An Rflush with a .fprev implies it is on a flush queue waiting for 452 * its corresponding 'oldtag' message to go out first, so punt 453 * until the 'oldtag' message goes out (see below). 454 */ 455 if(m->r.type == Rflush && m->fprev != nil){ 456 fprint(2, "msgWrite: delay r %F\n", &m->r); 457 return 0; 458 } 459 460 msgMunlink(m); 461 vtUnlock(con->mlock); 462 463 /* 464 * TODO: optimise this copy away somehow for 465 * read, stat, etc. 466 */ 467 assert(n = convS2M(&m->r, con->data, con->msize)); 468 if(write(con->fd, con->data, n) != n) 469 eof = 1; 470 else 471 eof = 0; 472 473 if(Dflag) 474 fprint(2, "msgWrite: r %F\n", &m->r); 475 476 /* 477 * Just wrote a reply. If it has any flushes waiting 478 * for it to have gone out, recurse down the list writing 479 * them out too. 480 */ 481 vtLock(con->mlock); 482 if(m->fnext != nil){ 483 m->fnext->fprev = nil; 484 eof += _msgWrite(m->fnext); 485 m->fnext = nil; 486 } 487 msgFree(m); 488 489 return eof; 490 } 491 492 static void 493 msgWrite(void* v) 494 { 495 Msg *m; 496 int eof; 497 Con *con; 498 499 vtThreadSetName("msgWrite"); 500 501 con = v; 502 if(vtThread(msgRead, con) < 0){ 503 conFree(con); 504 return; 505 } 506 507 for(;;){ 508 /* 509 * Wait for and pull a message off the write queue. 510 */ 511 vtLock(con->wlock); 512 while(con->whead == nil) 513 vtSleep(con->wrendez); 514 m = con->whead; 515 con->whead = m->rwnext; 516 m->rwnext = nil; 517 vtUnlock(con->wlock); 518 519 /* 520 * Throw the message away if it's a flushed flush, 521 * otherwise change its state and try to write it out. 522 */ 523 vtLock(con->mlock); 524 if(m->state == MsgF){ 525 assert(m->t.type == Tflush); 526 msgUnlinkUnlockAndFree(m); 527 continue; 528 } 529 m->state = MsgW; 530 eof = _msgWrite(m); 531 vtUnlock(con->mlock); 532 533 vtLock(con->lock); 534 if(eof && con->fd >= 0){ 535 close(con->fd); 536 con->fd = -1; 537 } 538 if(con->state == ConDown) 539 vtWakeup(con->rendez); 540 if(con->state == ConMoribund && con->mhead == nil){ 541 vtUnlock(con->lock); 542 conFree(con); 543 break; 544 } 545 vtUnlock(con->lock); 546 } 547 } 548 549 Con* 550 conAlloc(int fd, char* name) 551 { 552 Con *con; 553 554 vtLock(cbox.alock); 555 while(cbox.ahead == nil){ 556 if(cbox.ncon >= cbox.maxcon){ 557 cbox.nconstarve++; 558 vtSleep(cbox.arendez); 559 continue; 560 } 561 con = vtMemAllocZ(sizeof(Con)); 562 con->lock = vtLockAlloc(); 563 con->rendez = vtRendezAlloc(con->lock); 564 con->data = vtMemAlloc(cbox.msize); 565 con->msize = cbox.msize; 566 con->alock = vtLockAlloc(); 567 con->mlock = vtLockAlloc(); 568 con->mrendez = vtRendezAlloc(con->mlock); 569 con->wlock = vtLockAlloc(); 570 con->wrendez = vtRendezAlloc(con->wlock); 571 con->fidlock = vtLockAlloc(); 572 573 cbox.ncon++; 574 cbox.ahead = con; 575 break; 576 } 577 con = cbox.ahead; 578 cbox.ahead = con->anext; 579 con->anext = nil; 580 581 if(cbox.ctail != nil){ 582 con->cprev = cbox.ctail; 583 cbox.ctail->cnext = con; 584 } 585 else{ 586 cbox.chead = con; 587 con->cprev = nil; 588 } 589 cbox.ctail = con; 590 591 assert(con->mhead == nil); 592 assert(con->whead == nil); 593 assert(con->fhead == nil); 594 assert(con->nfid == 0); 595 596 con->state = ConNew; 597 con->fd = fd; 598 if(con->name != nil){ 599 vtMemFree(con->name); 600 con->name = nil; 601 } 602 if(name != nil) 603 con->name = vtStrDup(name); 604 else 605 con->name = vtStrDup("unknown"); 606 con->aok = 0; 607 vtUnlock(cbox.alock); 608 609 if(vtThread(msgWrite, con) < 0){ 610 conFree(con); 611 return nil; 612 } 613 614 return con; 615 } 616 617 static int 618 cmdMsg(int argc, char* argv[]) 619 { 620 char *p; 621 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 622 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 623 624 maxmsg = maxproc = 0; 625 626 ARGBEGIN{ 627 default: 628 return cliError(usage); 629 case 'm': 630 p = ARGF(); 631 if(p == nil) 632 return cliError(usage); 633 maxmsg = strtol(argv[0], &p, 0); 634 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 635 return cliError(usage); 636 break; 637 case 'p': 638 p = ARGF(); 639 if(p == nil) 640 return cliError(usage); 641 maxproc = strtol(argv[0], &p, 0); 642 if(maxproc <= 0 || p == argv[0] || *p != '\0') 643 return cliError(usage); 644 break; 645 }ARGEND 646 if(argc) 647 return cliError(usage); 648 649 vtLock(mbox.alock); 650 if(maxmsg) 651 mbox.maxmsg = maxmsg; 652 maxmsg = mbox.maxmsg; 653 nmsg = mbox.nmsg; 654 nmsgstarve = mbox.nmsgstarve; 655 vtUnlock(mbox.alock); 656 657 vtLock(mbox.rlock); 658 if(maxproc) 659 mbox.maxproc = maxproc; 660 maxproc = mbox.maxproc; 661 nproc = mbox.nproc; 662 nprocstarve = mbox.nprocstarve; 663 vtUnlock(mbox.rlock); 664 665 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 666 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 667 nmsg, nmsgstarve, nproc, nprocstarve); 668 669 return 1; 670 } 671 672 void 673 msgInit(void) 674 { 675 mbox.alock = vtLockAlloc(); 676 mbox.arendez = vtRendezAlloc(mbox.alock); 677 678 mbox.rlock = vtLockAlloc(); 679 mbox.rrendez = vtRendezAlloc(mbox.rlock); 680 681 mbox.maxmsg = NMsgInit; 682 mbox.maxproc = NMsgProcInit; 683 mbox.msize = NMsizeInit; 684 685 cliAddCmd("msg", cmdMsg); 686 } 687 688 static int 689 cmdCon(int argc, char* argv[]) 690 { 691 char *p; 692 Con *con; 693 char *usage = "usage: con [-m ncon]"; 694 int maxcon, ncon, nconstarve; 695 696 maxcon = 0; 697 698 ARGBEGIN{ 699 default: 700 return cliError(usage); 701 case 'm': 702 p = ARGF(); 703 if(p == nil) 704 return cliError(usage); 705 maxcon = strtol(argv[0], &p, 0); 706 if(maxcon <= 0 || p == argv[0] || *p != '\0') 707 return cliError(usage); 708 break; 709 }ARGEND 710 if(argc) 711 return cliError(usage); 712 713 vtLock(cbox.clock); 714 if(maxcon) 715 cbox.maxcon = maxcon; 716 maxcon = cbox.maxcon; 717 ncon = cbox.ncon; 718 nconstarve = cbox.nconstarve; 719 vtUnlock(cbox.clock); 720 721 consPrint("\tcon -m %d\n", maxcon); 722 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 723 724 vtRLock(cbox.clock); 725 for(con = cbox.chead; con != nil; con = con->cnext){ 726 consPrint("\t%s\n", con->name); 727 } 728 vtRUnlock(cbox.clock); 729 730 return 1; 731 } 732 733 void 734 conInit(void) 735 { 736 cbox.alock = vtLockAlloc(); 737 cbox.arendez = vtRendezAlloc(cbox.alock); 738 739 cbox.clock = vtLockAlloc(); 740 741 cbox.maxcon = NConInit; 742 cbox.msize = NMsizeInit; 743 744 cliAddCmd("con", cmdCon); 745 } 746