1368c31abSDavid du Colombier #include <u.h>
2368c31abSDavid du Colombier #include <libc.h>
3368c31abSDavid du Colombier #include <venti.h>
4368c31abSDavid du Colombier #include "queue.h"
5368c31abSDavid du Colombier
6368c31abSDavid du Colombier long ventisendbytes, ventisendpackets;
7368c31abSDavid du Colombier long ventirecvbytes, ventirecvpackets;
8368c31abSDavid du Colombier
9368c31abSDavid du Colombier static int
_vtsend(VtConn * z,Packet * p)10368c31abSDavid du Colombier _vtsend(VtConn *z, Packet *p)
11368c31abSDavid du Colombier {
12368c31abSDavid du Colombier IOchunk ioc;
13368c31abSDavid du Colombier int n, tot;
14368c31abSDavid du Colombier uchar buf[2];
15368c31abSDavid du Colombier
16368c31abSDavid du Colombier if(z->state != VtStateConnected) {
17368c31abSDavid du Colombier werrstr("session not connected");
18368c31abSDavid du Colombier return -1;
19368c31abSDavid du Colombier }
20368c31abSDavid du Colombier
21368c31abSDavid du Colombier /* add framing */
22368c31abSDavid du Colombier n = packetsize(p);
23368c31abSDavid du Colombier if(n >= (1<<16)) {
24368c31abSDavid du Colombier werrstr("packet too large");
25368c31abSDavid du Colombier packetfree(p);
26368c31abSDavid du Colombier return -1;
27368c31abSDavid du Colombier }
28368c31abSDavid du Colombier buf[0] = n>>8;
29368c31abSDavid du Colombier buf[1] = n;
30368c31abSDavid du Colombier packetprefix(p, buf, 2);
31368c31abSDavid du Colombier ventisendbytes += n+2;
32368c31abSDavid du Colombier ventisendpackets++;
33368c31abSDavid du Colombier
34368c31abSDavid du Colombier tot = 0;
35368c31abSDavid du Colombier for(;;){
36368c31abSDavid du Colombier n = packetfragments(p, &ioc, 1, 0);
37368c31abSDavid du Colombier if(n == 0)
38368c31abSDavid du Colombier break;
39368c31abSDavid du Colombier if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40368c31abSDavid du Colombier vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41368c31abSDavid du Colombier packetfree(p);
42*9ba92a1aSDavid du Colombier return -1;
43368c31abSDavid du Colombier }
44368c31abSDavid du Colombier packetconsume(p, nil, ioc.len);
45368c31abSDavid du Colombier tot += ioc.len;
46368c31abSDavid du Colombier }
47368c31abSDavid du Colombier vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48368c31abSDavid du Colombier packetfree(p);
49368c31abSDavid du Colombier return 1;
50368c31abSDavid du Colombier }
51368c31abSDavid du Colombier
52368c31abSDavid du Colombier static int
interrupted(void)53368c31abSDavid du Colombier interrupted(void)
54368c31abSDavid du Colombier {
55368c31abSDavid du Colombier char e[ERRMAX];
56368c31abSDavid du Colombier
57368c31abSDavid du Colombier rerrstr(e, sizeof e);
58368c31abSDavid du Colombier return strstr(e, "interrupted") != nil;
59368c31abSDavid du Colombier }
60368c31abSDavid du Colombier
61368c31abSDavid du Colombier
62368c31abSDavid du Colombier static Packet*
_vtrecv(VtConn * z)63368c31abSDavid du Colombier _vtrecv(VtConn *z)
64368c31abSDavid du Colombier {
65368c31abSDavid du Colombier uchar buf[10], *b;
66368c31abSDavid du Colombier int n;
67368c31abSDavid du Colombier Packet *p;
68368c31abSDavid du Colombier int size, len;
69368c31abSDavid du Colombier
70368c31abSDavid du Colombier if(z->state != VtStateConnected) {
71368c31abSDavid du Colombier werrstr("session not connected");
72368c31abSDavid du Colombier return nil;
73368c31abSDavid du Colombier }
74368c31abSDavid du Colombier
75368c31abSDavid du Colombier p = z->part;
76368c31abSDavid du Colombier /* get enough for head size */
77368c31abSDavid du Colombier size = packetsize(p);
78368c31abSDavid du Colombier while(size < 2) {
79368c31abSDavid du Colombier b = packettrailer(p, 2);
80368c31abSDavid du Colombier assert(b != nil);
81368c31abSDavid du Colombier if(0) fprint(2, "%d read hdr\n", getpid());
82368c31abSDavid du Colombier n = read(z->infd, b, 2);
83368c31abSDavid du Colombier if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84368c31abSDavid du Colombier if(n==0 || (n<0 && !interrupted()))
85368c31abSDavid du Colombier goto Err;
86368c31abSDavid du Colombier size += n;
87368c31abSDavid du Colombier packettrim(p, 0, size);
88368c31abSDavid du Colombier }
89368c31abSDavid du Colombier
90368c31abSDavid du Colombier if(packetconsume(p, buf, 2) < 0)
91368c31abSDavid du Colombier goto Err;
92368c31abSDavid du Colombier len = (buf[0] << 8) | buf[1];
93368c31abSDavid du Colombier size -= 2;
94368c31abSDavid du Colombier
95368c31abSDavid du Colombier while(size < len) {
96368c31abSDavid du Colombier n = len - size;
97368c31abSDavid du Colombier if(n > MaxFragSize)
98368c31abSDavid du Colombier n = MaxFragSize;
99368c31abSDavid du Colombier b = packettrailer(p, n);
100368c31abSDavid du Colombier if(0) fprint(2, "%d read body %d\n", getpid(), n);
101368c31abSDavid du Colombier n = read(z->infd, b, n);
102368c31abSDavid du Colombier if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103368c31abSDavid du Colombier if(n > 0)
104368c31abSDavid du Colombier size += n;
105368c31abSDavid du Colombier packettrim(p, 0, size);
106368c31abSDavid du Colombier if(n==0 || (n<0 && !interrupted()))
107368c31abSDavid du Colombier goto Err;
108368c31abSDavid du Colombier }
109368c31abSDavid du Colombier ventirecvbytes += len;
110368c31abSDavid du Colombier ventirecvpackets++;
111368c31abSDavid du Colombier p = packetsplit(p, len);
112368c31abSDavid du Colombier vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113368c31abSDavid du Colombier return p;
114368c31abSDavid du Colombier Err:
115368c31abSDavid du Colombier vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116368c31abSDavid du Colombier return nil;
117368c31abSDavid du Colombier }
118368c31abSDavid du Colombier
119368c31abSDavid du Colombier /*
120368c31abSDavid du Colombier * If you fork off two procs running vtrecvproc and vtsendproc,
121368c31abSDavid du Colombier * then vtrecv/vtsend (and thus vtrpc) will never block except on
122368c31abSDavid du Colombier * rendevouses, which is nice when it's running in one thread of many.
123368c31abSDavid du Colombier */
124368c31abSDavid du Colombier void
vtrecvproc(void * v)125368c31abSDavid du Colombier vtrecvproc(void *v)
126368c31abSDavid du Colombier {
127368c31abSDavid du Colombier Packet *p;
128368c31abSDavid du Colombier VtConn *z;
129368c31abSDavid du Colombier Queue *q;
130368c31abSDavid du Colombier
131368c31abSDavid du Colombier z = v;
132368c31abSDavid du Colombier q = _vtqalloc();
133368c31abSDavid du Colombier
134368c31abSDavid du Colombier qlock(&z->lk);
135368c31abSDavid du Colombier z->readq = q;
136368c31abSDavid du Colombier qlock(&z->inlk);
137368c31abSDavid du Colombier rwakeup(&z->rpcfork);
138368c31abSDavid du Colombier qunlock(&z->lk);
139368c31abSDavid du Colombier
140368c31abSDavid du Colombier while((p = _vtrecv(z)) != nil)
141368c31abSDavid du Colombier if(_vtqsend(q, p) < 0){
142368c31abSDavid du Colombier packetfree(p);
143368c31abSDavid du Colombier break;
144368c31abSDavid du Colombier }
145368c31abSDavid du Colombier qunlock(&z->inlk);
146368c31abSDavid du Colombier qlock(&z->lk);
147368c31abSDavid du Colombier _vtqhangup(q);
148368c31abSDavid du Colombier while((p = _vtnbqrecv(q)) != nil)
149368c31abSDavid du Colombier packetfree(p);
150368c31abSDavid du Colombier _vtqdecref(q);
151368c31abSDavid du Colombier z->readq = nil;
152368c31abSDavid du Colombier rwakeup(&z->rpcfork);
153368c31abSDavid du Colombier qunlock(&z->lk);
154368c31abSDavid du Colombier vthangup(z);
155368c31abSDavid du Colombier }
156368c31abSDavid du Colombier
157368c31abSDavid du Colombier void
vtsendproc(void * v)158368c31abSDavid du Colombier vtsendproc(void *v)
159368c31abSDavid du Colombier {
160368c31abSDavid du Colombier Queue *q;
161368c31abSDavid du Colombier Packet *p;
162368c31abSDavid du Colombier VtConn *z;
163368c31abSDavid du Colombier
164368c31abSDavid du Colombier z = v;
165368c31abSDavid du Colombier q = _vtqalloc();
166368c31abSDavid du Colombier
167368c31abSDavid du Colombier qlock(&z->lk);
168368c31abSDavid du Colombier z->writeq = q;
169368c31abSDavid du Colombier qlock(&z->outlk);
170368c31abSDavid du Colombier rwakeup(&z->rpcfork);
171368c31abSDavid du Colombier qunlock(&z->lk);
172368c31abSDavid du Colombier
173368c31abSDavid du Colombier while((p = _vtqrecv(q)) != nil)
174368c31abSDavid du Colombier if(_vtsend(z, p) < 0)
175368c31abSDavid du Colombier break;
176368c31abSDavid du Colombier qunlock(&z->outlk);
177368c31abSDavid du Colombier qlock(&z->lk);
178368c31abSDavid du Colombier _vtqhangup(q);
179368c31abSDavid du Colombier while((p = _vtnbqrecv(q)) != nil)
180368c31abSDavid du Colombier packetfree(p);
181368c31abSDavid du Colombier _vtqdecref(q);
182368c31abSDavid du Colombier z->writeq = nil;
183368c31abSDavid du Colombier rwakeup(&z->rpcfork);
184368c31abSDavid du Colombier qunlock(&z->lk);
185368c31abSDavid du Colombier return;
186368c31abSDavid du Colombier }
187368c31abSDavid du Colombier
188368c31abSDavid du Colombier Packet*
vtrecv(VtConn * z)189368c31abSDavid du Colombier vtrecv(VtConn *z)
190368c31abSDavid du Colombier {
191368c31abSDavid du Colombier Packet *p;
192368c31abSDavid du Colombier Queue *q;
193368c31abSDavid du Colombier
194368c31abSDavid du Colombier qlock(&z->lk);
195368c31abSDavid du Colombier if(z->state != VtStateConnected){
196368c31abSDavid du Colombier werrstr("not connected");
197368c31abSDavid du Colombier qunlock(&z->lk);
198368c31abSDavid du Colombier return nil;
199368c31abSDavid du Colombier }
200368c31abSDavid du Colombier if(z->readq){
201368c31abSDavid du Colombier q = _vtqincref(z->readq);
202368c31abSDavid du Colombier qunlock(&z->lk);
203368c31abSDavid du Colombier p = _vtqrecv(q);
204368c31abSDavid du Colombier _vtqdecref(q);
205368c31abSDavid du Colombier return p;
206368c31abSDavid du Colombier }
207368c31abSDavid du Colombier
208368c31abSDavid du Colombier qlock(&z->inlk);
209368c31abSDavid du Colombier qunlock(&z->lk);
210368c31abSDavid du Colombier p = _vtrecv(z);
211368c31abSDavid du Colombier qunlock(&z->inlk);
212368c31abSDavid du Colombier if(!p)
213368c31abSDavid du Colombier vthangup(z);
214368c31abSDavid du Colombier return p;
215368c31abSDavid du Colombier }
216368c31abSDavid du Colombier
217368c31abSDavid du Colombier int
vtsend(VtConn * z,Packet * p)218368c31abSDavid du Colombier vtsend(VtConn *z, Packet *p)
219368c31abSDavid du Colombier {
220368c31abSDavid du Colombier Queue *q;
221368c31abSDavid du Colombier
222368c31abSDavid du Colombier qlock(&z->lk);
223368c31abSDavid du Colombier if(z->state != VtStateConnected){
224368c31abSDavid du Colombier packetfree(p);
225368c31abSDavid du Colombier werrstr("not connected");
226368c31abSDavid du Colombier qunlock(&z->lk);
227368c31abSDavid du Colombier return -1;
228368c31abSDavid du Colombier }
229368c31abSDavid du Colombier if(z->writeq){
230368c31abSDavid du Colombier q = _vtqincref(z->writeq);
231368c31abSDavid du Colombier qunlock(&z->lk);
232368c31abSDavid du Colombier if(_vtqsend(q, p) < 0){
233368c31abSDavid du Colombier _vtqdecref(q);
234368c31abSDavid du Colombier packetfree(p);
235368c31abSDavid du Colombier return -1;
236368c31abSDavid du Colombier }
237368c31abSDavid du Colombier _vtqdecref(q);
238368c31abSDavid du Colombier return 0;
239368c31abSDavid du Colombier }
240368c31abSDavid du Colombier
241368c31abSDavid du Colombier qlock(&z->outlk);
242368c31abSDavid du Colombier qunlock(&z->lk);
243368c31abSDavid du Colombier if(_vtsend(z, p) < 0){
244368c31abSDavid du Colombier qunlock(&z->outlk);
245368c31abSDavid du Colombier vthangup(z);
246368c31abSDavid du Colombier return -1;
247368c31abSDavid du Colombier }
248368c31abSDavid du Colombier qunlock(&z->outlk);
249368c31abSDavid du Colombier return 0;
250368c31abSDavid du Colombier }
251368c31abSDavid du Colombier
252