1 #include "stdinc.h"
2
3 #include "9.h"
4 #include "dat.h"
5 #include "fns.h"
6
7 enum {
8 NConInit = 128,
9 NMsgInit = 384,
10 NMsgProcInit = 64,
11 NMsizeInit = 16*1024+IOHDRSZ,
12 };
13
14 static struct {
15 QLock alock; /* alloc */
16 Msg* ahead;
17 Rendez arendez;
18
19 int maxmsg;
20 int nmsg;
21 int nmsgstarve;
22
23 QLock rlock; /* read */
24 Msg* rhead;
25 Msg* rtail;
26 Rendez rrendez;
27
28 int maxproc;
29 int nproc;
30 int nprocstarve;
31
32 u32int msize; /* immutable */
33 } mbox;
34
35 static struct {
36 QLock alock; /* alloc */
37 Con* ahead;
38 Rendez arendez;
39
40 RWLock clock;
41 Con* chead;
42 Con* ctail;
43
44 int maxcon;
45 int ncon;
46 int nconstarve;
47
48 u32int msize;
49 } cbox;
50
51 static void
conFree(Con * con)52 conFree(Con* con)
53 {
54 assert(con->version == nil);
55 assert(con->mhead == nil);
56 assert(con->whead == nil);
57 assert(con->nfid == 0);
58 assert(con->state == ConMoribund);
59
60 if(con->fd >= 0){
61 close(con->fd);
62 con->fd = -1;
63 }
64 con->state = ConDead;
65 con->aok = 0;
66 con->flags = 0;
67 con->isconsole = 0;
68
69 qlock(&cbox.alock);
70 if(con->cprev != nil)
71 con->cprev->cnext = con->cnext;
72 else
73 cbox.chead = con->cnext;
74 if(con->cnext != nil)
75 con->cnext->cprev = con->cprev;
76 else
77 cbox.ctail = con->cprev;
78 con->cprev = con->cnext = nil;
79
80 if(cbox.ncon > cbox.maxcon){
81 if(con->name != nil)
82 vtfree(con->name);
83 vtfree(con->data);
84 vtfree(con);
85 cbox.ncon--;
86 qunlock(&cbox.alock);
87 return;
88 }
89 con->anext = cbox.ahead;
90 cbox.ahead = con;
91 if(con->anext == nil)
92 rwakeup(&cbox.arendez);
93 qunlock(&cbox.alock);
94 }
95
96 static void
msgFree(Msg * m)97 msgFree(Msg* m)
98 {
99 assert(m->rwnext == nil);
100 assert(m->flush == nil);
101
102 qlock(&mbox.alock);
103 if(mbox.nmsg > mbox.maxmsg){
104 vtfree(m->data);
105 vtfree(m);
106 mbox.nmsg--;
107 qunlock(&mbox.alock);
108 return;
109 }
110 m->anext = mbox.ahead;
111 mbox.ahead = m;
112 if(m->anext == nil)
113 rwakeup(&mbox.arendez);
114 qunlock(&mbox.alock);
115 }
116
117 static Msg*
msgAlloc(Con * con)118 msgAlloc(Con* con)
119 {
120 Msg *m;
121
122 qlock(&mbox.alock);
123 while(mbox.ahead == nil){
124 if(mbox.nmsg >= mbox.maxmsg){
125 mbox.nmsgstarve++;
126 rsleep(&mbox.arendez);
127 continue;
128 }
129 m = vtmallocz(sizeof(Msg));
130 m->data = vtmalloc(mbox.msize);
131 m->msize = mbox.msize;
132 mbox.nmsg++;
133 mbox.ahead = m;
134 break;
135 }
136 m = mbox.ahead;
137 mbox.ahead = m->anext;
138 m->anext = nil;
139 qunlock(&mbox.alock);
140
141 m->con = con;
142 m->state = MsgR;
143 m->nowq = 0;
144
145 return m;
146 }
147
148 static void
msgMunlink(Msg * m)149 msgMunlink(Msg* m)
150 {
151 Con *con;
152
153 con = m->con;
154
155 if(m->mprev != nil)
156 m->mprev->mnext = m->mnext;
157 else
158 con->mhead = m->mnext;
159 if(m->mnext != nil)
160 m->mnext->mprev = m->mprev;
161 else
162 con->mtail = m->mprev;
163 m->mprev = m->mnext = nil;
164 }
165
166 void
msgFlush(Msg * m)167 msgFlush(Msg* m)
168 {
169 Con *con;
170 Msg *flush, *old;
171
172 con = m->con;
173
174 if(Dflag)
175 fprint(2, "msgFlush %F\n", &m->t);
176
177 /*
178 * If this Tflush has been flushed, nothing to do.
179 * Look for the message to be flushed in the
180 * queue of all messages still on this connection.
181 * If it's not found must assume Elvis has already
182 * left the building and reply normally.
183 */
184 qlock(&con->mlock);
185 if(m->state == MsgF){
186 qunlock(&con->mlock);
187 return;
188 }
189 for(old = con->mhead; old != nil; old = old->mnext)
190 if(old->t.tag == m->t.oldtag)
191 break;
192 if(old == nil){
193 if(Dflag)
194 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag);
195 qunlock(&con->mlock);
196 return;
197 }
198
199 if(Dflag)
200 fprint(2, "\tmsgFlush found %F\n", &old->t);
201
202 /*
203 * Found it.
204 * There are two cases where the old message can be
205 * truly flushed and no reply to the original message given.
206 * The first is when the old message is in MsgR state; no
207 * processing has been done yet and it is still on the read
208 * queue. The second is if old is a Tflush, which doesn't
209 * affect the server state. In both cases, put the old
210 * message into MsgF state and let MsgWrite toss it after
211 * pulling it off the queue.
212 */
213 if(old->state == MsgR || old->t.type == Tflush){
214 old->state = MsgF;
215 if(Dflag)
216 fprint(2, "msgFlush: change %d from MsgR to MsgF\n",
217 m->t.oldtag);
218 }
219
220 /*
221 * Link this flush message and the old message
222 * so multiple flushes can be coalesced (if there are
223 * multiple Tflush messages for a particular pending
224 * request, it is only necessary to respond to the last
225 * one, so any previous can be removed) and to be
226 * sure flushes wait for their corresponding old
227 * message to go out first.
228 * Waiting flush messages do not go on the write queue,
229 * they are processed after the old message is dealt
230 * with. There's no real need to protect the setting of
231 * Msg.nowq, the only code to check it runs in this
232 * process after this routine returns.
233 */
234 if((flush = old->flush) != nil){
235 if(Dflag)
236 fprint(2, "msgFlush: remove %d from %d list\n",
237 old->flush->t.tag, old->t.tag);
238 m->flush = flush->flush;
239 flush->flush = nil;
240 msgMunlink(flush);
241 msgFree(flush);
242 }
243 old->flush = m;
244 m->nowq = 1;
245
246 if(Dflag)
247 fprint(2, "msgFlush: add %d to %d queue\n",
248 m->t.tag, old->t.tag);
249 qunlock(&con->mlock);
250 }
251
252 static void
msgProc(void *)253 msgProc(void*)
254 {
255 Msg *m;
256 char e[ERRMAX];
257 Con *con;
258
259 threadsetname("msgProc");
260
261 for(;;){
262 /*
263 * If surplus to requirements, exit.
264 * If not, wait for and pull a message off
265 * the read queue.
266 */
267 qlock(&mbox.rlock);
268 if(mbox.nproc > mbox.maxproc){
269 mbox.nproc--;
270 qunlock(&mbox.rlock);
271 break;
272 }
273 while(mbox.rhead == nil)
274 rsleep(&mbox.rrendez);
275 m = mbox.rhead;
276 mbox.rhead = m->rwnext;
277 m->rwnext = nil;
278 qunlock(&mbox.rlock);
279
280 con = m->con;
281 *e = 0;
282
283 /*
284 * If the message has been flushed before
285 * any 9P processing has started, mark it so
286 * none will be attempted.
287 */
288 qlock(&con->mlock);
289 if(m->state == MsgF)
290 strcpy(e, "flushed");
291 else
292 m->state = Msg9;
293 qunlock(&con->mlock);
294
295 if(*e == 0){
296 /*
297 * explain this
298 */
299 qlock(&con->lock);
300 if(m->t.type == Tversion){
301 con->version = m;
302 con->state = ConDown;
303 while(con->mhead != m)
304 rsleep(&con->rendez);
305 assert(con->state == ConDown);
306 if(con->version == m){
307 con->version = nil;
308 con->state = ConInit;
309 }
310 else
311 strcpy(e, "Tversion aborted");
312 }
313 else if(con->state != ConUp)
314 strcpy(e, "connection not ready");
315 qunlock(&con->lock);
316 }
317
318 /*
319 * Dispatch if not error already.
320 */
321 m->r.tag = m->t.tag;
322 if(*e == 0 && !(*rFcall[m->t.type])(m))
323 rerrstr(e, sizeof e);
324 if(*e != 0){
325 m->r.type = Rerror;
326 m->r.ename = e;
327 }
328 else
329 m->r.type = m->t.type+1;
330
331 /*
332 * Put the message (with reply) on the
333 * write queue and wakeup the write process.
334 */
335 if(!m->nowq){
336 qlock(&con->wlock);
337 if(con->whead == nil)
338 con->whead = m;
339 else
340 con->wtail->rwnext = m;
341 con->wtail = m;
342 rwakeup(&con->wrendez);
343 qunlock(&con->wlock);
344 }
345 }
346 }
347
348 static void
msgRead(void * v)349 msgRead(void* v)
350 {
351 Msg *m;
352 Con *con;
353 int eof, fd, n;
354
355 threadsetname("msgRead");
356
357 con = v;
358 fd = con->fd;
359 eof = 0;
360
361 while(!eof){
362 m = msgAlloc(con);
363
364 while((n = read9pmsg(fd, m->data, con->msize)) == 0)
365 ;
366 if(n < 0){
367 m->t.type = Tversion;
368 m->t.fid = NOFID;
369 m->t.tag = NOTAG;
370 m->t.msize = con->msize;
371 m->t.version = "9PEoF";
372 eof = 1;
373 }
374 else if(convM2S(m->data, n, &m->t) != n){
375 if(Dflag)
376 fprint(2, "msgRead: convM2S error: %s\n",
377 con->name);
378 msgFree(m);
379 continue;
380 }
381 if(Dflag)
382 fprint(2, "msgRead %p: t %F\n", con, &m->t);
383
384 qlock(&con->mlock);
385 if(con->mtail != nil){
386 m->mprev = con->mtail;
387 con->mtail->mnext = m;
388 }
389 else{
390 con->mhead = m;
391 m->mprev = nil;
392 }
393 con->mtail = m;
394 qunlock(&con->mlock);
395
396 qlock(&mbox.rlock);
397 if(mbox.rhead == nil){
398 mbox.rhead = m;
399 if(!rwakeup(&mbox.rrendez)){
400 if(mbox.nproc < mbox.maxproc){
401 if(proccreate(msgProc, nil, STACK) > 0)
402 mbox.nproc++;
403 }
404 else
405 mbox.nprocstarve++;
406 }
407 /*
408 * don't need this surely?
409 rwakeup(&mbox.rrendez);
410 */
411 }
412 else
413 mbox.rtail->rwnext = m;
414 mbox.rtail = m;
415 qunlock(&mbox.rlock);
416 }
417 }
418
419 static void
msgWrite(void * v)420 msgWrite(void* v)
421 {
422 Con *con;
423 int eof, n;
424 Msg *flush, *m;
425
426 threadsetname("msgWrite");
427
428 con = v;
429 if(proccreate(msgRead, con, STACK) < 0){
430 conFree(con);
431 return;
432 }
433
434 for(;;){
435 /*
436 * Wait for and pull a message off the write queue.
437 */
438 qlock(&con->wlock);
439 while(con->whead == nil)
440 rsleep(&con->wrendez);
441 m = con->whead;
442 con->whead = m->rwnext;
443 m->rwnext = nil;
444 assert(!m->nowq);
445 qunlock(&con->wlock);
446
447 eof = 0;
448
449 /*
450 * Write each message (if it hasn't been flushed)
451 * followed by any messages waiting for it to complete.
452 */
453 qlock(&con->mlock);
454 while(m != nil){
455 msgMunlink(m);
456
457 if(Dflag)
458 fprint(2, "msgWrite %d: r %F\n",
459 m->state, &m->r);
460
461 if(m->state != MsgF){
462 m->state = MsgW;
463 qunlock(&con->mlock);
464
465 n = convS2M(&m->r, con->data, con->msize);
466 if(write(con->fd, con->data, n) != n)
467 eof = 1;
468
469 qlock(&con->mlock);
470 }
471
472 if((flush = m->flush) != nil){
473 assert(flush->nowq);
474 m->flush = nil;
475 }
476 msgFree(m);
477 m = flush;
478 }
479 qunlock(&con->mlock);
480
481 qlock(&con->lock);
482 if(eof && con->fd >= 0){
483 close(con->fd);
484 con->fd = -1;
485 }
486 if(con->state == ConDown)
487 rwakeup(&con->rendez);
488 if(con->state == ConMoribund && con->mhead == nil){
489 qunlock(&con->lock);
490 conFree(con);
491 break;
492 }
493 qunlock(&con->lock);
494 }
495 }
496
497 Con*
conAlloc(int fd,char * name,int flags)498 conAlloc(int fd, char* name, int flags)
499 {
500 Con *con;
501 char buf[128], *p;
502 int rfd, n;
503
504 qlock(&cbox.alock);
505 while(cbox.ahead == nil){
506 if(cbox.ncon >= cbox.maxcon){
507 cbox.nconstarve++;
508 rsleep(&cbox.arendez);
509 continue;
510 }
511 con = vtmallocz(sizeof(Con));
512 con->rendez.l = &con->lock;
513 con->data = vtmalloc(cbox.msize);
514 con->msize = cbox.msize;
515 con->mrendez.l = &con->mlock;
516 con->wrendez.l = &con->wlock;
517
518 cbox.ncon++;
519 cbox.ahead = con;
520 break;
521 }
522 con = cbox.ahead;
523 cbox.ahead = con->anext;
524 con->anext = nil;
525
526 if(cbox.ctail != nil){
527 con->cprev = cbox.ctail;
528 cbox.ctail->cnext = con;
529 }
530 else{
531 cbox.chead = con;
532 con->cprev = nil;
533 }
534 cbox.ctail = con;
535
536 assert(con->mhead == nil);
537 assert(con->whead == nil);
538 assert(con->fhead == nil);
539 assert(con->nfid == 0);
540
541 con->state = ConNew;
542 con->fd = fd;
543 if(con->name != nil){
544 vtfree(con->name);
545 con->name = nil;
546 }
547 if(name != nil)
548 con->name = vtstrdup(name);
549 else
550 con->name = vtstrdup("unknown");
551 con->remote[0] = 0;
552 snprint(buf, sizeof buf, "%s/remote", con->name);
553 if((rfd = open(buf, OREAD)) >= 0){
554 n = read(rfd, buf, sizeof buf-1);
555 close(rfd);
556 if(n > 0){
557 buf[n] = 0;
558 if((p = strchr(buf, '\n')) != nil)
559 *p = 0;
560 strecpy(con->remote, con->remote+sizeof con->remote, buf);
561 }
562 }
563 con->flags = flags;
564 con->isconsole = 0;
565 qunlock(&cbox.alock);
566
567 if(proccreate(msgWrite, con, STACK) < 0){
568 conFree(con);
569 return nil;
570 }
571
572 return con;
573 }
574
575 static int
cmdMsg(int argc,char * argv[])576 cmdMsg(int argc, char* argv[])
577 {
578 char *p;
579 char *usage = "usage: msg [-m nmsg] [-p nproc]";
580 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve;
581
582 maxmsg = maxproc = 0;
583
584 ARGBEGIN{
585 default:
586 return cliError(usage);
587 case 'm':
588 p = ARGF();
589 if(p == nil)
590 return cliError(usage);
591 maxmsg = strtol(argv[0], &p, 0);
592 if(maxmsg <= 0 || p == argv[0] || *p != '\0')
593 return cliError(usage);
594 break;
595 case 'p':
596 p = ARGF();
597 if(p == nil)
598 return cliError(usage);
599 maxproc = strtol(argv[0], &p, 0);
600 if(maxproc <= 0 || p == argv[0] || *p != '\0')
601 return cliError(usage);
602 break;
603 }ARGEND
604 if(argc)
605 return cliError(usage);
606
607 qlock(&mbox.alock);
608 if(maxmsg)
609 mbox.maxmsg = maxmsg;
610 maxmsg = mbox.maxmsg;
611 nmsg = mbox.nmsg;
612 nmsgstarve = mbox.nmsgstarve;
613 qunlock(&mbox.alock);
614
615 qlock(&mbox.rlock);
616 if(maxproc)
617 mbox.maxproc = maxproc;
618 maxproc = mbox.maxproc;
619 nproc = mbox.nproc;
620 nprocstarve = mbox.nprocstarve;
621 qunlock(&mbox.rlock);
622
623 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc);
624 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n",
625 nmsg, nmsgstarve, nproc, nprocstarve);
626
627 return 1;
628 }
629
630 static int
scmp(Fid * a,Fid * b)631 scmp(Fid *a, Fid *b)
632 {
633 if(a == 0)
634 return 1;
635 if(b == 0)
636 return -1;
637 return strcmp(a->uname, b->uname);
638 }
639
640 static Fid*
fidMerge(Fid * a,Fid * b)641 fidMerge(Fid *a, Fid *b)
642 {
643 Fid *s, **l;
644
645 l = &s;
646 while(a || b){
647 if(scmp(a, b) < 0){
648 *l = a;
649 l = &a->sort;
650 a = a->sort;
651 }else{
652 *l = b;
653 l = &b->sort;
654 b = b->sort;
655 }
656 }
657 *l = 0;
658 return s;
659 }
660
661 static Fid*
fidMergeSort(Fid * f)662 fidMergeSort(Fid *f)
663 {
664 int delay;
665 Fid *a, *b;
666
667 if(f == nil)
668 return nil;
669 if(f->sort == nil)
670 return f;
671
672 a = b = f;
673 delay = 1;
674 while(a && b){
675 if(delay) /* easy way to handle 2-element list */
676 delay = 0;
677 else
678 a = a->sort;
679 if(b = b->sort)
680 b = b->sort;
681 }
682
683 b = a->sort;
684 a->sort = nil;
685
686 a = fidMergeSort(f);
687 b = fidMergeSort(b);
688
689 return fidMerge(a, b);
690 }
691
692 static int
cmdWho(int argc,char * argv[])693 cmdWho(int argc, char* argv[])
694 {
695 char *usage = "usage: who";
696 int i, l1, l2, l;
697 Con *con;
698 Fid *fid, *last;
699
700 ARGBEGIN{
701 default:
702 return cliError(usage);
703 }ARGEND
704
705 if(argc > 0)
706 return cliError(usage);
707
708 rlock(&cbox.clock);
709 l1 = 0;
710 l2 = 0;
711 for(con=cbox.chead; con; con=con->cnext){
712 if((l = strlen(con->name)) > l1)
713 l1 = l;
714 if((l = strlen(con->remote)) > l2)
715 l2 = l;
716 }
717 for(con=cbox.chead; con; con=con->cnext){
718 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote);
719 qlock(&con->fidlock);
720 last = nil;
721 for(i=0; i<NFidHash; i++)
722 for(fid=con->fidhash[i]; fid; fid=fid->hash)
723 if(fid->fidno != NOFID && fid->uname){
724 fid->sort = last;
725 last = fid;
726 }
727 fid = fidMergeSort(last);
728 last = nil;
729 for(; fid; last=fid, fid=fid->sort)
730 if(last==nil || strcmp(fid->uname, last->uname) != 0)
731 consPrint(" %q", fid->uname);
732 qunlock(&con->fidlock);
733 consPrint("\n");
734 }
735 runlock(&cbox.clock);
736 return 1;
737 }
738
739 void
msgInit(void)740 msgInit(void)
741 {
742 mbox.arendez.l = &mbox.alock;
743
744 mbox.rrendez.l = &mbox.rlock;
745
746 mbox.maxmsg = NMsgInit;
747 mbox.maxproc = NMsgProcInit;
748 mbox.msize = NMsizeInit;
749
750 cliAddCmd("msg", cmdMsg);
751 }
752
753 static int
cmdCon(int argc,char * argv[])754 cmdCon(int argc, char* argv[])
755 {
756 char *p;
757 Con *con;
758 char *usage = "usage: con [-m ncon]";
759 int maxcon, ncon, nconstarve;
760
761 maxcon = 0;
762
763 ARGBEGIN{
764 default:
765 return cliError(usage);
766 case 'm':
767 p = ARGF();
768 if(p == nil)
769 return cliError(usage);
770 maxcon = strtol(argv[0], &p, 0);
771 if(maxcon <= 0 || p == argv[0] || *p != '\0')
772 return cliError(usage);
773 break;
774 }ARGEND
775 if(argc)
776 return cliError(usage);
777
778 wlock(&cbox.clock);
779 if(maxcon)
780 cbox.maxcon = maxcon;
781 maxcon = cbox.maxcon;
782 ncon = cbox.ncon;
783 nconstarve = cbox.nconstarve;
784 wunlock(&cbox.clock);
785
786 consPrint("\tcon -m %d\n", maxcon);
787 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve);
788
789 rlock(&cbox.clock);
790 for(con = cbox.chead; con != nil; con = con->cnext){
791 consPrint("\t%s\n", con->name);
792 }
793 runlock(&cbox.clock);
794
795 return 1;
796 }
797
798 void
conInit(void)799 conInit(void)
800 {
801 cbox.arendez.l = &cbox.alock;
802
803 cbox.maxcon = NConInit;
804 cbox.msize = NMsizeInit;
805
806 cliAddCmd("con", cmdCon);
807 cliAddCmd("who", cmdWho);
808 }
809