xref: /plan9/sys/src/libventi/send.c (revision 9ba92a1a1d8a653f6050241dbdbd039f2ecb1dc2)
1368c31abSDavid du Colombier #include <u.h>
2368c31abSDavid du Colombier #include <libc.h>
3368c31abSDavid du Colombier #include <venti.h>
4368c31abSDavid du Colombier #include "queue.h"
5368c31abSDavid du Colombier 
6368c31abSDavid du Colombier long ventisendbytes, ventisendpackets;
7368c31abSDavid du Colombier long ventirecvbytes, ventirecvpackets;
8368c31abSDavid du Colombier 
9368c31abSDavid du Colombier static int
_vtsend(VtConn * z,Packet * p)10368c31abSDavid du Colombier _vtsend(VtConn *z, Packet *p)
11368c31abSDavid du Colombier {
12368c31abSDavid du Colombier 	IOchunk ioc;
13368c31abSDavid du Colombier 	int n, tot;
14368c31abSDavid du Colombier 	uchar buf[2];
15368c31abSDavid du Colombier 
16368c31abSDavid du Colombier 	if(z->state != VtStateConnected) {
17368c31abSDavid du Colombier 		werrstr("session not connected");
18368c31abSDavid du Colombier 		return -1;
19368c31abSDavid du Colombier 	}
20368c31abSDavid du Colombier 
21368c31abSDavid du Colombier 	/* add framing */
22368c31abSDavid du Colombier 	n = packetsize(p);
23368c31abSDavid du Colombier 	if(n >= (1<<16)) {
24368c31abSDavid du Colombier 		werrstr("packet too large");
25368c31abSDavid du Colombier 		packetfree(p);
26368c31abSDavid du Colombier 		return -1;
27368c31abSDavid du Colombier 	}
28368c31abSDavid du Colombier 	buf[0] = n>>8;
29368c31abSDavid du Colombier 	buf[1] = n;
30368c31abSDavid du Colombier 	packetprefix(p, buf, 2);
31368c31abSDavid du Colombier 	ventisendbytes += n+2;
32368c31abSDavid du Colombier 	ventisendpackets++;
33368c31abSDavid du Colombier 
34368c31abSDavid du Colombier 	tot = 0;
35368c31abSDavid du Colombier 	for(;;){
36368c31abSDavid du Colombier 		n = packetfragments(p, &ioc, 1, 0);
37368c31abSDavid du Colombier 		if(n == 0)
38368c31abSDavid du Colombier 			break;
39368c31abSDavid du Colombier 		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40368c31abSDavid du Colombier 			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41368c31abSDavid du Colombier 			packetfree(p);
42*9ba92a1aSDavid du Colombier 			return -1;
43368c31abSDavid du Colombier 		}
44368c31abSDavid du Colombier 		packetconsume(p, nil, ioc.len);
45368c31abSDavid du Colombier 		tot += ioc.len;
46368c31abSDavid du Colombier 	}
47368c31abSDavid du Colombier 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48368c31abSDavid du Colombier 	packetfree(p);
49368c31abSDavid du Colombier 	return 1;
50368c31abSDavid du Colombier }
51368c31abSDavid du Colombier 
52368c31abSDavid du Colombier static int
interrupted(void)53368c31abSDavid du Colombier interrupted(void)
54368c31abSDavid du Colombier {
55368c31abSDavid du Colombier 	char e[ERRMAX];
56368c31abSDavid du Colombier 
57368c31abSDavid du Colombier 	rerrstr(e, sizeof e);
58368c31abSDavid du Colombier 	return strstr(e, "interrupted") != nil;
59368c31abSDavid du Colombier }
60368c31abSDavid du Colombier 
61368c31abSDavid du Colombier 
62368c31abSDavid du Colombier static Packet*
_vtrecv(VtConn * z)63368c31abSDavid du Colombier _vtrecv(VtConn *z)
64368c31abSDavid du Colombier {
65368c31abSDavid du Colombier 	uchar buf[10], *b;
66368c31abSDavid du Colombier 	int n;
67368c31abSDavid du Colombier 	Packet *p;
68368c31abSDavid du Colombier 	int size, len;
69368c31abSDavid du Colombier 
70368c31abSDavid du Colombier 	if(z->state != VtStateConnected) {
71368c31abSDavid du Colombier 		werrstr("session not connected");
72368c31abSDavid du Colombier 		return nil;
73368c31abSDavid du Colombier 	}
74368c31abSDavid du Colombier 
75368c31abSDavid du Colombier 	p = z->part;
76368c31abSDavid du Colombier 	/* get enough for head size */
77368c31abSDavid du Colombier 	size = packetsize(p);
78368c31abSDavid du Colombier 	while(size < 2) {
79368c31abSDavid du Colombier 		b = packettrailer(p, 2);
80368c31abSDavid du Colombier 		assert(b != nil);
81368c31abSDavid du Colombier 		if(0) fprint(2, "%d read hdr\n", getpid());
82368c31abSDavid du Colombier 		n = read(z->infd, b, 2);
83368c31abSDavid du Colombier 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84368c31abSDavid du Colombier 		if(n==0 || (n<0 && !interrupted()))
85368c31abSDavid du Colombier 			goto Err;
86368c31abSDavid du Colombier 		size += n;
87368c31abSDavid du Colombier 		packettrim(p, 0, size);
88368c31abSDavid du Colombier 	}
89368c31abSDavid du Colombier 
90368c31abSDavid du Colombier 	if(packetconsume(p, buf, 2) < 0)
91368c31abSDavid du Colombier 		goto Err;
92368c31abSDavid du Colombier 	len = (buf[0] << 8) | buf[1];
93368c31abSDavid du Colombier 	size -= 2;
94368c31abSDavid du Colombier 
95368c31abSDavid du Colombier 	while(size < len) {
96368c31abSDavid du Colombier 		n = len - size;
97368c31abSDavid du Colombier 		if(n > MaxFragSize)
98368c31abSDavid du Colombier 			n = MaxFragSize;
99368c31abSDavid du Colombier 		b = packettrailer(p, n);
100368c31abSDavid du Colombier 		if(0) fprint(2, "%d read body %d\n", getpid(), n);
101368c31abSDavid du Colombier 		n = read(z->infd, b, n);
102368c31abSDavid du Colombier 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103368c31abSDavid du Colombier 		if(n > 0)
104368c31abSDavid du Colombier 			size += n;
105368c31abSDavid du Colombier 		packettrim(p, 0, size);
106368c31abSDavid du Colombier 		if(n==0 || (n<0 && !interrupted()))
107368c31abSDavid du Colombier 			goto Err;
108368c31abSDavid du Colombier 	}
109368c31abSDavid du Colombier 	ventirecvbytes += len;
110368c31abSDavid du Colombier 	ventirecvpackets++;
111368c31abSDavid du Colombier 	p = packetsplit(p, len);
112368c31abSDavid du Colombier 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113368c31abSDavid du Colombier 	return p;
114368c31abSDavid du Colombier Err:
115368c31abSDavid du Colombier 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116368c31abSDavid du Colombier 	return nil;
117368c31abSDavid du Colombier }
118368c31abSDavid du Colombier 
119368c31abSDavid du Colombier /*
120368c31abSDavid du Colombier  * If you fork off two procs running vtrecvproc and vtsendproc,
121368c31abSDavid du Colombier  * then vtrecv/vtsend (and thus vtrpc) will never block except on
122368c31abSDavid du Colombier  * rendevouses, which is nice when it's running in one thread of many.
123368c31abSDavid du Colombier  */
124368c31abSDavid du Colombier void
vtrecvproc(void * v)125368c31abSDavid du Colombier vtrecvproc(void *v)
126368c31abSDavid du Colombier {
127368c31abSDavid du Colombier 	Packet *p;
128368c31abSDavid du Colombier 	VtConn *z;
129368c31abSDavid du Colombier 	Queue *q;
130368c31abSDavid du Colombier 
131368c31abSDavid du Colombier 	z = v;
132368c31abSDavid du Colombier 	q = _vtqalloc();
133368c31abSDavid du Colombier 
134368c31abSDavid du Colombier 	qlock(&z->lk);
135368c31abSDavid du Colombier 	z->readq = q;
136368c31abSDavid du Colombier 	qlock(&z->inlk);
137368c31abSDavid du Colombier 	rwakeup(&z->rpcfork);
138368c31abSDavid du Colombier 	qunlock(&z->lk);
139368c31abSDavid du Colombier 
140368c31abSDavid du Colombier 	while((p = _vtrecv(z)) != nil)
141368c31abSDavid du Colombier 		if(_vtqsend(q, p) < 0){
142368c31abSDavid du Colombier 			packetfree(p);
143368c31abSDavid du Colombier 			break;
144368c31abSDavid du Colombier 		}
145368c31abSDavid du Colombier 	qunlock(&z->inlk);
146368c31abSDavid du Colombier 	qlock(&z->lk);
147368c31abSDavid du Colombier 	_vtqhangup(q);
148368c31abSDavid du Colombier 	while((p = _vtnbqrecv(q)) != nil)
149368c31abSDavid du Colombier 		packetfree(p);
150368c31abSDavid du Colombier 	_vtqdecref(q);
151368c31abSDavid du Colombier 	z->readq = nil;
152368c31abSDavid du Colombier 	rwakeup(&z->rpcfork);
153368c31abSDavid du Colombier 	qunlock(&z->lk);
154368c31abSDavid du Colombier 	vthangup(z);
155368c31abSDavid du Colombier }
156368c31abSDavid du Colombier 
157368c31abSDavid du Colombier void
vtsendproc(void * v)158368c31abSDavid du Colombier vtsendproc(void *v)
159368c31abSDavid du Colombier {
160368c31abSDavid du Colombier 	Queue *q;
161368c31abSDavid du Colombier 	Packet *p;
162368c31abSDavid du Colombier 	VtConn *z;
163368c31abSDavid du Colombier 
164368c31abSDavid du Colombier 	z = v;
165368c31abSDavid du Colombier 	q = _vtqalloc();
166368c31abSDavid du Colombier 
167368c31abSDavid du Colombier 	qlock(&z->lk);
168368c31abSDavid du Colombier 	z->writeq = q;
169368c31abSDavid du Colombier 	qlock(&z->outlk);
170368c31abSDavid du Colombier 	rwakeup(&z->rpcfork);
171368c31abSDavid du Colombier 	qunlock(&z->lk);
172368c31abSDavid du Colombier 
173368c31abSDavid du Colombier 	while((p = _vtqrecv(q)) != nil)
174368c31abSDavid du Colombier 		if(_vtsend(z, p) < 0)
175368c31abSDavid du Colombier 			break;
176368c31abSDavid du Colombier 	qunlock(&z->outlk);
177368c31abSDavid du Colombier 	qlock(&z->lk);
178368c31abSDavid du Colombier 	_vtqhangup(q);
179368c31abSDavid du Colombier 	while((p = _vtnbqrecv(q)) != nil)
180368c31abSDavid du Colombier 		packetfree(p);
181368c31abSDavid du Colombier 	_vtqdecref(q);
182368c31abSDavid du Colombier 	z->writeq = nil;
183368c31abSDavid du Colombier 	rwakeup(&z->rpcfork);
184368c31abSDavid du Colombier 	qunlock(&z->lk);
185368c31abSDavid du Colombier 	return;
186368c31abSDavid du Colombier }
187368c31abSDavid du Colombier 
188368c31abSDavid du Colombier Packet*
vtrecv(VtConn * z)189368c31abSDavid du Colombier vtrecv(VtConn *z)
190368c31abSDavid du Colombier {
191368c31abSDavid du Colombier 	Packet *p;
192368c31abSDavid du Colombier 	Queue *q;
193368c31abSDavid du Colombier 
194368c31abSDavid du Colombier 	qlock(&z->lk);
195368c31abSDavid du Colombier 	if(z->state != VtStateConnected){
196368c31abSDavid du Colombier 		werrstr("not connected");
197368c31abSDavid du Colombier 		qunlock(&z->lk);
198368c31abSDavid du Colombier 		return nil;
199368c31abSDavid du Colombier 	}
200368c31abSDavid du Colombier 	if(z->readq){
201368c31abSDavid du Colombier 		q = _vtqincref(z->readq);
202368c31abSDavid du Colombier 		qunlock(&z->lk);
203368c31abSDavid du Colombier 		p = _vtqrecv(q);
204368c31abSDavid du Colombier 		_vtqdecref(q);
205368c31abSDavid du Colombier 		return p;
206368c31abSDavid du Colombier 	}
207368c31abSDavid du Colombier 
208368c31abSDavid du Colombier 	qlock(&z->inlk);
209368c31abSDavid du Colombier 	qunlock(&z->lk);
210368c31abSDavid du Colombier 	p = _vtrecv(z);
211368c31abSDavid du Colombier 	qunlock(&z->inlk);
212368c31abSDavid du Colombier 	if(!p)
213368c31abSDavid du Colombier 		vthangup(z);
214368c31abSDavid du Colombier 	return p;
215368c31abSDavid du Colombier }
216368c31abSDavid du Colombier 
217368c31abSDavid du Colombier int
vtsend(VtConn * z,Packet * p)218368c31abSDavid du Colombier vtsend(VtConn *z, Packet *p)
219368c31abSDavid du Colombier {
220368c31abSDavid du Colombier 	Queue *q;
221368c31abSDavid du Colombier 
222368c31abSDavid du Colombier 	qlock(&z->lk);
223368c31abSDavid du Colombier 	if(z->state != VtStateConnected){
224368c31abSDavid du Colombier 		packetfree(p);
225368c31abSDavid du Colombier 		werrstr("not connected");
226368c31abSDavid du Colombier 		qunlock(&z->lk);
227368c31abSDavid du Colombier 		return -1;
228368c31abSDavid du Colombier 	}
229368c31abSDavid du Colombier 	if(z->writeq){
230368c31abSDavid du Colombier 		q = _vtqincref(z->writeq);
231368c31abSDavid du Colombier 		qunlock(&z->lk);
232368c31abSDavid du Colombier 		if(_vtqsend(q, p) < 0){
233368c31abSDavid du Colombier 			_vtqdecref(q);
234368c31abSDavid du Colombier 			packetfree(p);
235368c31abSDavid du Colombier 			return -1;
236368c31abSDavid du Colombier 		}
237368c31abSDavid du Colombier 		_vtqdecref(q);
238368c31abSDavid du Colombier 		return 0;
239368c31abSDavid du Colombier 	}
240368c31abSDavid du Colombier 
241368c31abSDavid du Colombier 	qlock(&z->outlk);
242368c31abSDavid du Colombier 	qunlock(&z->lk);
243368c31abSDavid du Colombier 	if(_vtsend(z, p) < 0){
244368c31abSDavid du Colombier 		qunlock(&z->outlk);
245368c31abSDavid du Colombier 		vthangup(z);
246368c31abSDavid du Colombier 		return -1;
247368c31abSDavid du Colombier 	}
248368c31abSDavid du Colombier 	qunlock(&z->outlk);
249368c31abSDavid du Colombier 	return 0;
250368c31abSDavid du Colombier }
251368c31abSDavid du Colombier 
252