1 #include "stdinc.h"
2
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6
7 enum {
8 NConInit = 128,
9 NMsgInit = 384,
10 NMsgProcInit = 64,
11 NMsizeInit = 8192+IOHDRSZ,
12 };
13
14 static struct {
15 VtLock* alock; /* alloc */
16 Msg* ahead;
17 VtRendez* arendez;
18
19 int maxmsg;
20 int nmsg;
21 int nmsgstarve;
22
23 VtLock* rlock; /* read */
24 Msg* rhead;
25 Msg* rtail;
26 VtRendez* rrendez;
27
28 int maxproc;
29 int nproc;
30 int nprocstarve;
31
32 u32int msize; /* immutable */
33 } mbox;
34
35 static struct {
36 VtLock* alock; /* alloc */
37 Con* ahead;
38 VtRendez* arendez;
39
40 VtLock* clock;
41 Con* chead;
42 Con* ctail;
43
44 int maxcon;
45 int ncon;
46 int nconstarve;
47
48 u32int msize;
49 } cbox;
50
51 static void
conFree(Con * con)52 conFree(Con* con)
53 {
54 assert(con->version == nil);
55 assert(con->mhead == nil);
56 assert(con->whead == nil);
57 assert(con->nfid == 0);
58 assert(con->state == ConMoribund);
59
60 if(con->fd >= 0){
61 close(con->fd);
62 con->fd = -1;
63 }
64 con->state = ConDead;
65 con->aok = 0;
66 con->flags = 0;
67 con->isconsole = 0;
68
69 vtLock(cbox.alock);
70 if(con->cprev != nil)
71 con->cprev->cnext = con->cnext;
72 else
73 cbox.chead = con->cnext;
74 if(con->cnext != nil)
75 con->cnext->cprev = con->cprev;
76 else
77 cbox.ctail = con->cprev;
78 con->cprev = con->cnext = nil;
79
80 if(cbox.ncon > cbox.maxcon){
81 if(con->name != nil)
82 vtMemFree(con->name);
83 vtLockFree(con->fidlock);
84 vtMemFree(con->data);
85 vtRendezFree(con->wrendez);
86 vtLockFree(con->wlock);
87 vtRendezFree(con->mrendez);
88 vtLockFree(con->mlock);
89 vtRendezFree(con->rendez);
90 vtLockFree(con->lock);
91 vtMemFree(con);
92 cbox.ncon--;
93 vtUnlock(cbox.alock);
94 return;
95 }
96 con->anext = cbox.ahead;
97 cbox.ahead = con;
98 if(con->anext == nil)
99 vtWakeup(cbox.arendez);
100 vtUnlock(cbox.alock);
101 }
102
103 static void
msgFree(Msg * m)104 msgFree(Msg* m)
105 {
106 assert(m->rwnext == nil);
107 assert(m->flush == nil);
108
109 vtLock(mbox.alock);
110 if(mbox.nmsg > mbox.maxmsg){
111 vtMemFree(m->data);
112 vtMemFree(m);
113 mbox.nmsg--;
114 vtUnlock(mbox.alock);
115 return;
116 }
117 m->anext = mbox.ahead;
118 mbox.ahead = m;
119 if(m->anext == nil)
120 vtWakeup(mbox.arendez);
121 vtUnlock(mbox.alock);
122 }
123
124 static Msg*
msgAlloc(Con * con)125 msgAlloc(Con* con)
126 {
127 Msg *m;
128
129 vtLock(mbox.alock);
130 while(mbox.ahead == nil){
131 if(mbox.nmsg >= mbox.maxmsg){
132 mbox.nmsgstarve++;
133 vtSleep(mbox.arendez);
134 continue;
135 }
136 m = vtMemAllocZ(sizeof(Msg));
137 m->data = vtMemAlloc(mbox.msize);
138 m->msize = mbox.msize;
139 mbox.nmsg++;
140 mbox.ahead = m;
141 break;
142 }
143 m = mbox.ahead;
144 mbox.ahead = m->anext;
145 m->anext = nil;
146 vtUnlock(mbox.alock);
147
148 m->con = con;
149 m->state = MsgR;
150 m->nowq = 0;
151
152 return m;
153 }
154
155 static void
msgMunlink(Msg * m)156 msgMunlink(Msg* m)
157 {
158 Con *con;
159
160 con = m->con;
161
162 if(m->mprev != nil)
163 m->mprev->mnext = m->mnext;
164 else
165 con->mhead = m->mnext;
166 if(m->mnext != nil)
167 m->mnext->mprev = m->mprev;
168 else
169 con->mtail = m->mprev;
170 m->mprev = m->mnext = nil;
171 }
172
173 void
msgFlush(Msg * m)174 msgFlush(Msg* m)
175 {
176 Con *con;
177 Msg *flush, *old;
178
179 con = m->con;
180
181 if(Dflag)
182 fprint(2, "msgFlush %F\n", &m->t);
183
184 /*
185 * If this Tflush has been flushed, nothing to do.
186 * Look for the message to be flushed in the
187 * queue of all messages still on this connection.
188 * If it's not found must assume Elvis has already
189 * left the building and reply normally.
190 */
191 vtLock(con->mlock);
192 if(m->state == MsgF){
193 vtUnlock(con->mlock);
194 return;
195 }
196 for(old = con->mhead; old != nil; old = old->mnext)
197 if(old->t.tag == m->t.oldtag)
198 break;
199 if(old == nil){
200 if(Dflag)
201 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
202 vtUnlock(con->mlock);
203 return;
204 }
205
206 if(Dflag)
207 fprint(2, "\tmsgFlush found %F\n", &old->t);
208
209 /*
210 * Found it.
211 * There are two cases where the old message can be
212 * truly flushed and no reply to the original message given.
213 * The first is when the old message is in MsgR state; no
214 * processing has been done yet and it is still on the read
215 * queue. The second is if old is a Tflush, which doesn't
216 * affect the server state. In both cases, put the old
217 * message into MsgF state and let MsgWrite toss it after
218 * pulling it off the queue.
219 */
220 if(old->state == MsgR || old->t.type == Tflush){
221 old->state = MsgF;
222 if(Dflag)
223 fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
224 m->t.oldtag);
225 }
226
227 /*
228 * Link this flush message and the old message
229 * so multiple flushes can be coalesced (if there are
230 * multiple Tflush messages for a particular pending
231 * request, it is only necessary to respond to the last
232 * one, so any previous can be removed) and to be
233 * sure flushes wait for their corresponding old
234 * message to go out first.
235 * Waiting flush messages do not go on the write queue,
236 * they are processed after the old message is dealt
237 * with. There's no real need to protect the setting of
238 * Msg.nowq, the only code to check it runs in this
239 * process after this routine returns.
240 */
241 if((flush = old->flush) != nil){
242 if(Dflag)
243 fprint(2, "msgFlush: remove %d from %d list\n",
244 old->flush->t.tag, old->t.tag);
245 m->flush = flush->flush;
246 flush->flush = nil;
247 msgMunlink(flush);
248 msgFree(flush);
249 }
250 old->flush = m;
251 m->nowq = 1;
252
253 if(Dflag)
254 fprint(2, "msgFlush: add %d to %d queue\n",
255 m->t.tag, old->t.tag);
256 vtUnlock(con->mlock);
257 }
258
259 static void
msgProc(void *)260 msgProc(void*)
261 {
262 Msg *m;
263 char *e;
264 Con *con;
265
266 vtThreadSetName("msgProc");
267
268 for(;;){
269 /*
270 * If surplus to requirements, exit.
271 * If not, wait for and pull a message off
272 * the read queue.
273 */
274 vtLock(mbox.rlock);
275 if(mbox.nproc > mbox.maxproc){
276 mbox.nproc--;
277 vtUnlock(mbox.rlock);
278 break;
279 }
280 while(mbox.rhead == nil)
281 vtSleep(mbox.rrendez);
282 m = mbox.rhead;
283 mbox.rhead = m->rwnext;
284 m->rwnext = nil;
285 vtUnlock(mbox.rlock);
286
287 con = m->con;
288 e = nil;
289
290 /*
291 * If the message has been flushed before
292 * any 9P processing has started, mark it so
293 * none will be attempted.
294 */
295 vtLock(con->mlock);
296 if(m->state == MsgF)
297 e = "flushed";
298 else
299 m->state = Msg9;
300 vtUnlock(con->mlock);
301
302 if(e == nil){
303 /*
304 * explain this
305 */
306 vtLock(con->lock);
307 if(m->t.type == Tversion){
308 con->version = m;
309 con->state = ConDown;
310 while(con->mhead != m)
311 vtSleep(con->rendez);
312 assert(con->state == ConDown);
313 if(con->version == m){
314 con->version = nil;
315 con->state = ConInit;
316 }
317 else
318 e = "Tversion aborted";
319 }
320 else if(con->state != ConUp)
321 e = "connection not ready";
322 vtUnlock(con->lock);
323 }
324
325 /*
326 * Dispatch if not error already.
327 */
328 m->r.tag = m->t.tag;
329 if(e == nil && !(*rFcall[m->t.type])(m))
330 e = vtGetError();
331 if(e != nil){
332 m->r.type = Rerror;
333 m->r.ename = e;
334 }
335 else
336 m->r.type = m->t.type+1;
337
338 /*
339 * Put the message (with reply) on the
340 * write queue and wakeup the write process.
341 */
342 if(!m->nowq){
343 vtLock(con->wlock);
344 if(con->whead == nil)
345 con->whead = m;
346 else
347 con->wtail->rwnext = m;
348 con->wtail = m;
349 vtWakeup(con->wrendez);
350 vtUnlock(con->wlock);
351 }
352 }
353 }
354
355 static void
msgRead(void * v)356 msgRead(void* v)
357 {
358 Msg *m;
359 Con *con;
360 int eof, fd, n;
361
362 vtThreadSetName("msgRead");
363
364 con = v;
365 fd = con->fd;
366 eof = 0;
367
368 while(!eof){
369 m = msgAlloc(con);
370
371 while((n = read9pmsg(fd, m->data, con->msize)) == 0)
372 ;
373 if(n < 0){
374 m->t.type = Tversion;
375 m->t.fid = NOFID;
376 m->t.tag = NOTAG;
377 m->t.msize = con->msize;
378 m->t.version = "9PEoF";
379 eof = 1;
380 }
381 else if(convM2S(m->data, n, &m->t) != n){
382 if(Dflag)
383 fprint(2, "msgRead: convM2S error: %s\n",
384 con->name);
385 msgFree(m);
386 continue;
387 }
388 if(Dflag)
389 fprint(2, "msgRead %p: t %F\n", con, &m->t);
390
391 vtLock(con->mlock);
392 if(con->mtail != nil){
393 m->mprev = con->mtail;
394 con->mtail->mnext = m;
395 }
396 else{
397 con->mhead = m;
398 m->mprev = nil;
399 }
400 con->mtail = m;
401 vtUnlock(con->mlock);
402
403 vtLock(mbox.rlock);
404 if(mbox.rhead == nil){
405 mbox.rhead = m;
406 if(!vtWakeup(mbox.rrendez)){
407 if(mbox.nproc < mbox.maxproc){
408 if(vtThread(msgProc, nil) > 0)
409 mbox.nproc++;
410 }
411 else
412 mbox.nprocstarve++;
413 }
414 /*
415 * don't need this surely?
416 vtWakeup(mbox.rrendez);
417 */
418 }
419 else
420 mbox.rtail->rwnext = m;
421 mbox.rtail = m;
422 vtUnlock(mbox.rlock);
423 }
424 }
425
426 static void
msgWrite(void * v)427 msgWrite(void* v)
428 {
429 Con *con;
430 int eof, n;
431 Msg *flush, *m;
432
433 vtThreadSetName("msgWrite");
434
435 con = v;
436 if(vtThread(msgRead, con) < 0){
437 conFree(con);
438 return;
439 }
440
441 for(;;){
442 /*
443 * Wait for and pull a message off the write queue.
444 */
445 vtLock(con->wlock);
446 while(con->whead == nil)
447 vtSleep(con->wrendez);
448 m = con->whead;
449 con->whead = m->rwnext;
450 m->rwnext = nil;
451 assert(!m->nowq);
452 vtUnlock(con->wlock);
453
454 eof = 0;
455
456 /*
457 * Write each message (if it hasn't been flushed)
458 * followed by any messages waiting for it to complete.
459 */
460 vtLock(con->mlock);
461 while(m != nil){
462 msgMunlink(m);
463
464 if(Dflag)
465 fprint(2, "msgWrite %d: r %F\n",
466 m->state, &m->r);
467
468 if(m->state != MsgF){
469 m->state = MsgW;
470 vtUnlock(con->mlock);
471
472 n = convS2M(&m->r, con->data, con->msize);
473 if(write(con->fd, con->data, n) != n)
474 eof = 1;
475
476 vtLock(con->mlock);
477 }
478
479 if((flush = m->flush) != nil){
480 assert(flush->nowq);
481 m->flush = nil;
482 }
483 msgFree(m);
484 m = flush;
485 }
486 vtUnlock(con->mlock);
487
488 vtLock(con->lock);
489 if(eof && con->fd >= 0){
490 close(con->fd);
491 con->fd = -1;
492 }
493 if(con->state == ConDown)
494 vtWakeup(con->rendez);
495 if(con->state == ConMoribund && con->mhead == nil){
496 vtUnlock(con->lock);
497 conFree(con);
498 break;
499 }
500 vtUnlock(con->lock);
501 }
502 }
503
504 Con*
conAlloc(int fd,char * name,int flags)505 conAlloc(int fd, char* name, int flags)
506 {
507 Con *con;
508 char buf[128], *p;
509 int rfd, n;
510
511 vtLock(cbox.alock);
512 while(cbox.ahead == nil){
513 if(cbox.ncon >= cbox.maxcon){
514 cbox.nconstarve++;
515 vtSleep(cbox.arendez);
516 continue;
517 }
518 con = vtMemAllocZ(sizeof(Con));
519 con->lock = vtLockAlloc();
520 con->rendez = vtRendezAlloc(con->lock);
521 con->data = vtMemAlloc(cbox.msize);
522 con->msize = cbox.msize;
523 con->alock = vtLockAlloc();
524 con->mlock = vtLockAlloc();
525 con->mrendez = vtRendezAlloc(con->mlock);
526 con->wlock = vtLockAlloc();
527 con->wrendez = vtRendezAlloc(con->wlock);
528 con->fidlock = vtLockAlloc();
529
530 cbox.ncon++;
531 cbox.ahead = con;
532 break;
533 }
534 con = cbox.ahead;
535 cbox.ahead = con->anext;
536 con->anext = nil;
537
538 if(cbox.ctail != nil){
539 con->cprev = cbox.ctail;
540 cbox.ctail->cnext = con;
541 }
542 else{
543 cbox.chead = con;
544 con->cprev = nil;
545 }
546 cbox.ctail = con;
547
548 assert(con->mhead == nil);
549 assert(con->whead == nil);
550 assert(con->fhead == nil);
551 assert(con->nfid == 0);
552
553 con->state = ConNew;
554 con->fd = fd;
555 if(con->name != nil){
556 vtMemFree(con->name);
557 con->name = nil;
558 }
559 if(name != nil)
560 con->name = vtStrDup(name);
561 else
562 con->name = vtStrDup("unknown");
563 con->remote[0] = 0;
564 snprint(buf, sizeof buf, "%s/remote", con->name);
565 if((rfd = open(buf, OREAD)) >= 0){
566 n = read(rfd, buf, sizeof buf-1);
567 close(rfd);
568 if(n > 0){
569 buf[n] = 0;
570 if((p = strchr(buf, '\n')) != nil)
571 *p = 0;
572 strecpy(con->remote, con->remote+sizeof con->remote, buf);
573 }
574 }
575 con->flags = flags;
576 con->isconsole = 0;
577 vtUnlock(cbox.alock);
578
579 if(vtThread(msgWrite, con) < 0){
580 conFree(con);
581 return nil;
582 }
583
584 return con;
585 }
586
587 static int
cmdMsg(int argc,char * argv[])588 cmdMsg(int argc, char* argv[])
589 {
590 char *p;
591 char *usage = "usage: msg [-m nmsg] [-p nproc]";
592 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
593
594 maxmsg = maxproc = 0;
595
596 ARGBEGIN{
597 default:
598 return cliError(usage);
599 case 'm':
600 p = ARGF();
601 if(p == nil)
602 return cliError(usage);
603 maxmsg = strtol(argv[0], &p, 0);
604 if(maxmsg <= 0 || p == argv[0] || *p != '\0')
605 return cliError(usage);
606 break;
607 case 'p':
608 p = ARGF();
609 if(p == nil)
610 return cliError(usage);
611 maxproc = strtol(argv[0], &p, 0);
612 if(maxproc <= 0 || p == argv[0] || *p != '\0')
613 return cliError(usage);
614 break;
615 }ARGEND
616 if(argc)
617 return cliError(usage);
618
619 vtLock(mbox.alock);
620 if(maxmsg)
621 mbox.maxmsg = maxmsg;
622 maxmsg = mbox.maxmsg;
623 nmsg = mbox.nmsg;
624 nmsgstarve = mbox.nmsgstarve;
625 vtUnlock(mbox.alock);
626
627 vtLock(mbox.rlock);
628 if(maxproc)
629 mbox.maxproc = maxproc;
630 maxproc = mbox.maxproc;
631 nproc = mbox.nproc;
632 nprocstarve = mbox.nprocstarve;
633 vtUnlock(mbox.rlock);
634
635 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
636 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
637 nmsg, nmsgstarve, nproc, nprocstarve);
638
639 return 1;
640 }
641
642 static int
scmp(Fid * a,Fid * b)643 scmp(Fid *a, Fid *b)
644 {
645 if(a == 0)
646 return 1;
647 if(b == 0)
648 return -1;
649 return strcmp(a->uname, b->uname);
650 }
651
652 static Fid*
fidMerge(Fid * a,Fid * b)653 fidMerge(Fid *a, Fid *b)
654 {
655 Fid *s, **l;
656
657 l = &s;
658 while(a || b){
659 if(scmp(a, b) < 0){
660 *l = a;
661 l = &a->sort;
662 a = a->sort;
663 }else{
664 *l = b;
665 l = &b->sort;
666 b = b->sort;
667 }
668 }
669 *l = 0;
670 return s;
671 }
672
673 static Fid*
fidMergeSort(Fid * f)674 fidMergeSort(Fid *f)
675 {
676 int delay;
677 Fid *a, *b;
678
679 if(f == nil)
680 return nil;
681 if(f->sort == nil)
682 return f;
683
684 a = b = f;
685 delay = 1;
686 while(a && b){
687 if(delay) /* easy way to handle 2-element list */
688 delay = 0;
689 else
690 a = a->sort;
691 if(b = b->sort)
692 b = b->sort;
693 }
694
695 b = a->sort;
696 a->sort = nil;
697
698 a = fidMergeSort(f);
699 b = fidMergeSort(b);
700
701 return fidMerge(a, b);
702 }
703
704 static int
cmdWho(int argc,char * argv[])705 cmdWho(int argc, char* argv[])
706 {
707 char *usage = "usage: who";
708 int i, l1, l2, l;
709 Con *con;
710 Fid *fid, *last;
711
712 ARGBEGIN{
713 default:
714 return cliError(usage);
715 }ARGEND
716
717 if(argc > 0)
718 return cliError(usage);
719
720 vtRLock(cbox.clock);
721 l1 = 0;
722 l2 = 0;
723 for(con=cbox.chead; con; con=con->cnext){
724 if((l = strlen(con->name)) > l1)
725 l1 = l;
726 if((l = strlen(con->remote)) > l2)
727 l2 = l;
728 }
729 for(con=cbox.chead; con; con=con->cnext){
730 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
731 vtLock(con->fidlock);
732 last = nil;
733 for(i=0; i<NFidHash; i++)
734 for(fid=con->fidhash[i]; fid; fid=fid->hash)
735 if(fid->fidno != NOFID && fid->uname){
736 fid->sort = last;
737 last = fid;
738 }
739 fid = fidMergeSort(last);
740 last = nil;
741 for(; fid; last=fid, fid=fid->sort)
742 if(last==nil || strcmp(fid->uname, last->uname) != 0)
743 consPrint(" %q", fid->uname);
744 vtUnlock(con->fidlock);
745 consPrint("\n");
746 }
747 vtRUnlock(cbox.clock);
748 return 1;
749 }
750
751 void
msgInit(void)752 msgInit(void)
753 {
754 mbox.alock = vtLockAlloc();
755 mbox.arendez = vtRendezAlloc(mbox.alock);
756
757 mbox.rlock = vtLockAlloc();
758 mbox.rrendez = vtRendezAlloc(mbox.rlock);
759
760 mbox.maxmsg = NMsgInit;
761 mbox.maxproc = NMsgProcInit;
762 mbox.msize = NMsizeInit;
763
764 cliAddCmd("msg", cmdMsg);
765 }
766
767 static int
cmdCon(int argc,char * argv[])768 cmdCon(int argc, char* argv[])
769 {
770 char *p;
771 Con *con;
772 char *usage = "usage: con [-m ncon]";
773 int maxcon, ncon, nconstarve;
774
775 maxcon = 0;
776
777 ARGBEGIN{
778 default:
779 return cliError(usage);
780 case 'm':
781 p = ARGF();
782 if(p == nil)
783 return cliError(usage);
784 maxcon = strtol(argv[0], &p, 0);
785 if(maxcon <= 0 || p == argv[0] || *p != '\0')
786 return cliError(usage);
787 break;
788 }ARGEND
789 if(argc)
790 return cliError(usage);
791
792 vtLock(cbox.clock);
793 if(maxcon)
794 cbox.maxcon = maxcon;
795 maxcon = cbox.maxcon;
796 ncon = cbox.ncon;
797 nconstarve = cbox.nconstarve;
798 vtUnlock(cbox.clock);
799
800 consPrint("\tcon -m %d\n", maxcon);
801 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
802
803 vtRLock(cbox.clock);
804 for(con = cbox.chead; con != nil; con = con->cnext){
805 consPrint("\t%s\n", con->name);
806 }
807 vtRUnlock(cbox.clock);
808
809 return 1;
810 }
811
812 void
conInit(void)813 conInit(void)
814 {
815 cbox.alock = vtLockAlloc();
816 cbox.arendez = vtRendezAlloc(cbox.alock);
817
818 cbox.clock = vtLockAlloc();
819
820 cbox.maxcon = NConInit;
821 cbox.msize = NMsizeInit;
822
823 cliAddCmd("con", cmdCon);
824 cliAddCmd("who", cmdWho);
825 }
826