1368c31abSDavid du Colombier /*
2368c31abSDavid du Colombier * Multiplexed Venti client. It would be nice if we
3368c31abSDavid du Colombier * could turn this into a generic library routine rather
4368c31abSDavid du Colombier * than keep it Venti specific. A user-level 9P client
5368c31abSDavid du Colombier * could use something like this too.
6368c31abSDavid du Colombier *
7368c31abSDavid du Colombier * (Actually it does - this should be replaced with libmux,
8368c31abSDavid du Colombier * which should be renamed librpcmux.)
9368c31abSDavid du Colombier *
10368c31abSDavid du Colombier * This is a little more complicated than it might be
11368c31abSDavid du Colombier * because we want it to work well within and without libthread.
12368c31abSDavid du Colombier *
13368c31abSDavid du Colombier * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
14368c31abSDavid du Colombier */
15368c31abSDavid du Colombier
166b6b9ac8SDavid du Colombier #include <u.h>
176b6b9ac8SDavid du Colombier #include <libc.h>
186b6b9ac8SDavid du Colombier #include <venti.h>
196b6b9ac8SDavid du Colombier
20368c31abSDavid du Colombier typedef struct Rwait Rwait;
21368c31abSDavid du Colombier struct Rwait
22368c31abSDavid du Colombier {
23368c31abSDavid du Colombier Rendez r;
24368c31abSDavid du Colombier Packet *p;
25368c31abSDavid du Colombier int done;
26368c31abSDavid du Colombier int sleeping;
276b6b9ac8SDavid du Colombier };
286b6b9ac8SDavid du Colombier
29368c31abSDavid du Colombier static int gettag(VtConn*, Rwait*);
30368c31abSDavid du Colombier static void puttag(VtConn*, Rwait*, int);
31368c31abSDavid du Colombier static void muxrpc(VtConn*, Packet*);
326b6b9ac8SDavid du Colombier
336b6b9ac8SDavid du Colombier Packet*
_vtrpc(VtConn * z,Packet * p,VtFcall * tx)34368c31abSDavid du Colombier _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
356b6b9ac8SDavid du Colombier {
36368c31abSDavid du Colombier int i;
37368c31abSDavid du Colombier uchar tag, buf[2], *top;
38368c31abSDavid du Colombier Rwait *r, *rr;
396b6b9ac8SDavid du Colombier
40*9b7bf7dfSDavid du Colombier if(z == nil){
41*9b7bf7dfSDavid du Colombier werrstr("not connected");
42*9b7bf7dfSDavid du Colombier packetfree(p);
43*9b7bf7dfSDavid du Colombier return nil;
44*9b7bf7dfSDavid du Colombier }
45*9b7bf7dfSDavid du Colombier
46368c31abSDavid du Colombier /* must malloc because stack could be private */
47368c31abSDavid du Colombier r = vtmallocz(sizeof(Rwait));
48368c31abSDavid du Colombier
49368c31abSDavid du Colombier qlock(&z->lk);
50368c31abSDavid du Colombier r->r.l = &z->lk;
51368c31abSDavid du Colombier tag = gettag(z, r);
52368c31abSDavid du Colombier if(tx){
53368c31abSDavid du Colombier /* vtfcallrpc can't print packet because it doesn't have tag */
54368c31abSDavid du Colombier tx->tag = tag;
55368c31abSDavid du Colombier if(chattyventi)
56368c31abSDavid du Colombier fprint(2, "%s -> %F\n", argv0, tx);
576b6b9ac8SDavid du Colombier }
586b6b9ac8SDavid du Colombier
59368c31abSDavid du Colombier /* slam tag into packet */
60368c31abSDavid du Colombier top = packetpeek(p, buf, 0, 2);
61368c31abSDavid du Colombier if(top == nil){
62368c31abSDavid du Colombier packetfree(p);
63368c31abSDavid du Colombier return nil;
646b6b9ac8SDavid du Colombier }
65368c31abSDavid du Colombier if(top == buf){
66368c31abSDavid du Colombier werrstr("first two bytes must be in same packet fragment");
67368c31abSDavid du Colombier packetfree(p);
68368c31abSDavid du Colombier vtfree(r);
69368c31abSDavid du Colombier return nil;
706b6b9ac8SDavid du Colombier }
71368c31abSDavid du Colombier top[1] = tag;
72368c31abSDavid du Colombier qunlock(&z->lk);
73368c31abSDavid du Colombier if(vtsend(z, p) < 0){
74368c31abSDavid du Colombier vtfree(r);
756b6b9ac8SDavid du Colombier return nil;
766b6b9ac8SDavid du Colombier }
776b6b9ac8SDavid du Colombier
78368c31abSDavid du Colombier qlock(&z->lk);
79368c31abSDavid du Colombier /* wait for the muxer to give us our packet */
80368c31abSDavid du Colombier r->sleeping = 1;
81368c31abSDavid du Colombier z->nsleep++;
82368c31abSDavid du Colombier while(z->muxer && !r->done)
83368c31abSDavid du Colombier rsleep(&r->r);
84368c31abSDavid du Colombier z->nsleep--;
85368c31abSDavid du Colombier r->sleeping = 0;
866b6b9ac8SDavid du Colombier
87368c31abSDavid du Colombier /* if not done, there's no muxer: start muxing */
88368c31abSDavid du Colombier if(!r->done){
89368c31abSDavid du Colombier if(z->muxer)
90368c31abSDavid du Colombier abort();
91368c31abSDavid du Colombier z->muxer = 1;
92368c31abSDavid du Colombier while(!r->done){
93368c31abSDavid du Colombier qunlock(&z->lk);
94368c31abSDavid du Colombier if((p = vtrecv(z)) == nil){
95368c31abSDavid du Colombier werrstr("unexpected eof on venti connection");
96368c31abSDavid du Colombier z->muxer = 0;
97368c31abSDavid du Colombier vtfree(r);
98368c31abSDavid du Colombier return nil;
996b6b9ac8SDavid du Colombier }
100368c31abSDavid du Colombier qlock(&z->lk);
101368c31abSDavid du Colombier muxrpc(z, p);
102368c31abSDavid du Colombier }
103368c31abSDavid du Colombier z->muxer = 0;
104368c31abSDavid du Colombier /* if there is anyone else sleeping, wake first unfinished to mux */
105368c31abSDavid du Colombier if(z->nsleep)
106368c31abSDavid du Colombier for(i=0; i<256; i++){
107368c31abSDavid du Colombier rr = z->wait[i];
108368c31abSDavid du Colombier if(rr && rr->sleeping && !rr->done){
109368c31abSDavid du Colombier rwakeup(&rr->r);
1106b6b9ac8SDavid du Colombier break;
1116b6b9ac8SDavid du Colombier }
1126b6b9ac8SDavid du Colombier }
1136b6b9ac8SDavid du Colombier }
1146b6b9ac8SDavid du Colombier
115368c31abSDavid du Colombier p = r->p;
116368c31abSDavid du Colombier puttag(z, r, tag);
117368c31abSDavid du Colombier vtfree(r);
118368c31abSDavid du Colombier qunlock(&z->lk);
119368c31abSDavid du Colombier return p;
120368c31abSDavid du Colombier }
1216b6b9ac8SDavid du Colombier
122368c31abSDavid du Colombier Packet*
vtrpc(VtConn * z,Packet * p)123368c31abSDavid du Colombier vtrpc(VtConn *z, Packet *p)
1246b6b9ac8SDavid du Colombier {
125368c31abSDavid du Colombier return _vtrpc(z, p, nil);
1266b6b9ac8SDavid du Colombier }
1276b6b9ac8SDavid du Colombier
128368c31abSDavid du Colombier static int
gettag(VtConn * z,Rwait * r)129368c31abSDavid du Colombier gettag(VtConn *z, Rwait *r)
1306b6b9ac8SDavid du Colombier {
1316b6b9ac8SDavid du Colombier int i;
1326b6b9ac8SDavid du Colombier
133368c31abSDavid du Colombier Again:
134368c31abSDavid du Colombier while(z->ntag == 256)
135368c31abSDavid du Colombier rsleep(&z->tagrend);
136368c31abSDavid du Colombier for(i=0; i<256; i++)
137368c31abSDavid du Colombier if(z->wait[i] == 0){
138368c31abSDavid du Colombier z->ntag++;
139368c31abSDavid du Colombier z->wait[i] = r;
140368c31abSDavid du Colombier return i;
1416b6b9ac8SDavid du Colombier }
142368c31abSDavid du Colombier fprint(2, "libventi: ntag botch\n");
143368c31abSDavid du Colombier goto Again;
14439734e7eSDavid du Colombier }
1456b6b9ac8SDavid du Colombier
146368c31abSDavid du Colombier static void
puttag(VtConn * z,Rwait * r,int tag)147368c31abSDavid du Colombier puttag(VtConn *z, Rwait *r, int tag)
148368c31abSDavid du Colombier {
149368c31abSDavid du Colombier assert(z->wait[tag] == r);
150368c31abSDavid du Colombier z->wait[tag] = nil;
151368c31abSDavid du Colombier z->ntag--;
152368c31abSDavid du Colombier rwakeup(&z->tagrend);
1536b6b9ac8SDavid du Colombier }
1546b6b9ac8SDavid du Colombier
155368c31abSDavid du Colombier static void
muxrpc(VtConn * z,Packet * p)156368c31abSDavid du Colombier muxrpc(VtConn *z, Packet *p)
157368c31abSDavid du Colombier {
158368c31abSDavid du Colombier uchar tag, buf[2], *top;
159368c31abSDavid du Colombier Rwait *r;
1606b6b9ac8SDavid du Colombier
161368c31abSDavid du Colombier if((top = packetpeek(p, buf, 0, 2)) == nil){
162368c31abSDavid du Colombier fprint(2, "libventi: short packet in vtrpc\n");
163368c31abSDavid du Colombier packetfree(p);
164368c31abSDavid du Colombier return;
165368c31abSDavid du Colombier }
1666b6b9ac8SDavid du Colombier
167368c31abSDavid du Colombier tag = top[1];
168368c31abSDavid du Colombier if((r = z->wait[tag]) == nil){
169368c31abSDavid du Colombier fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
170368c31abSDavid du Colombier abort();
171368c31abSDavid du Colombier packetfree(p);
172368c31abSDavid du Colombier return;
173368c31abSDavid du Colombier }
1746b6b9ac8SDavid du Colombier
175368c31abSDavid du Colombier r->p = p;
176368c31abSDavid du Colombier r->done = 1;
177368c31abSDavid du Colombier rwakeup(&r->r);
1786b6b9ac8SDavid du Colombier }
1796b6b9ac8SDavid du Colombier
180