xref: /plan9/sys/src/libventi/send.c (revision 9ba92a1a1d8a653f6050241dbdbd039f2ecb1dc2)
1 #include <u.h>
2 #include <libc.h>
3 #include <venti.h>
4 #include "queue.h"
5 
6 long ventisendbytes, ventisendpackets;
7 long ventirecvbytes, ventirecvpackets;
8 
9 static int
_vtsend(VtConn * z,Packet * p)10 _vtsend(VtConn *z, Packet *p)
11 {
12 	IOchunk ioc;
13 	int n, tot;
14 	uchar buf[2];
15 
16 	if(z->state != VtStateConnected) {
17 		werrstr("session not connected");
18 		return -1;
19 	}
20 
21 	/* add framing */
22 	n = packetsize(p);
23 	if(n >= (1<<16)) {
24 		werrstr("packet too large");
25 		packetfree(p);
26 		return -1;
27 	}
28 	buf[0] = n>>8;
29 	buf[1] = n;
30 	packetprefix(p, buf, 2);
31 	ventisendbytes += n+2;
32 	ventisendpackets++;
33 
34 	tot = 0;
35 	for(;;){
36 		n = packetfragments(p, &ioc, 1, 0);
37 		if(n == 0)
38 			break;
39 		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
40 			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
41 			packetfree(p);
42 			return -1;
43 		}
44 		packetconsume(p, nil, ioc.len);
45 		tot += ioc.len;
46 	}
47 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
48 	packetfree(p);
49 	return 1;
50 }
51 
52 static int
interrupted(void)53 interrupted(void)
54 {
55 	char e[ERRMAX];
56 
57 	rerrstr(e, sizeof e);
58 	return strstr(e, "interrupted") != nil;
59 }
60 
61 
62 static Packet*
_vtrecv(VtConn * z)63 _vtrecv(VtConn *z)
64 {
65 	uchar buf[10], *b;
66 	int n;
67 	Packet *p;
68 	int size, len;
69 
70 	if(z->state != VtStateConnected) {
71 		werrstr("session not connected");
72 		return nil;
73 	}
74 
75 	p = z->part;
76 	/* get enough for head size */
77 	size = packetsize(p);
78 	while(size < 2) {
79 		b = packettrailer(p, 2);
80 		assert(b != nil);
81 		if(0) fprint(2, "%d read hdr\n", getpid());
82 		n = read(z->infd, b, 2);
83 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
84 		if(n==0 || (n<0 && !interrupted()))
85 			goto Err;
86 		size += n;
87 		packettrim(p, 0, size);
88 	}
89 
90 	if(packetconsume(p, buf, 2) < 0)
91 		goto Err;
92 	len = (buf[0] << 8) | buf[1];
93 	size -= 2;
94 
95 	while(size < len) {
96 		n = len - size;
97 		if(n > MaxFragSize)
98 			n = MaxFragSize;
99 		b = packettrailer(p, n);
100 		if(0) fprint(2, "%d read body %d\n", getpid(), n);
101 		n = read(z->infd, b, n);
102 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
103 		if(n > 0)
104 			size += n;
105 		packettrim(p, 0, size);
106 		if(n==0 || (n<0 && !interrupted()))
107 			goto Err;
108 	}
109 	ventirecvbytes += len;
110 	ventirecvpackets++;
111 	p = packetsplit(p, len);
112 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
113 	return p;
114 Err:
115 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
116 	return nil;
117 }
118 
119 /*
120  * If you fork off two procs running vtrecvproc and vtsendproc,
121  * then vtrecv/vtsend (and thus vtrpc) will never block except on
122  * rendevouses, which is nice when it's running in one thread of many.
123  */
124 void
vtrecvproc(void * v)125 vtrecvproc(void *v)
126 {
127 	Packet *p;
128 	VtConn *z;
129 	Queue *q;
130 
131 	z = v;
132 	q = _vtqalloc();
133 
134 	qlock(&z->lk);
135 	z->readq = q;
136 	qlock(&z->inlk);
137 	rwakeup(&z->rpcfork);
138 	qunlock(&z->lk);
139 
140 	while((p = _vtrecv(z)) != nil)
141 		if(_vtqsend(q, p) < 0){
142 			packetfree(p);
143 			break;
144 		}
145 	qunlock(&z->inlk);
146 	qlock(&z->lk);
147 	_vtqhangup(q);
148 	while((p = _vtnbqrecv(q)) != nil)
149 		packetfree(p);
150 	_vtqdecref(q);
151 	z->readq = nil;
152 	rwakeup(&z->rpcfork);
153 	qunlock(&z->lk);
154 	vthangup(z);
155 }
156 
157 void
vtsendproc(void * v)158 vtsendproc(void *v)
159 {
160 	Queue *q;
161 	Packet *p;
162 	VtConn *z;
163 
164 	z = v;
165 	q = _vtqalloc();
166 
167 	qlock(&z->lk);
168 	z->writeq = q;
169 	qlock(&z->outlk);
170 	rwakeup(&z->rpcfork);
171 	qunlock(&z->lk);
172 
173 	while((p = _vtqrecv(q)) != nil)
174 		if(_vtsend(z, p) < 0)
175 			break;
176 	qunlock(&z->outlk);
177 	qlock(&z->lk);
178 	_vtqhangup(q);
179 	while((p = _vtnbqrecv(q)) != nil)
180 		packetfree(p);
181 	_vtqdecref(q);
182 	z->writeq = nil;
183 	rwakeup(&z->rpcfork);
184 	qunlock(&z->lk);
185 	return;
186 }
187 
188 Packet*
vtrecv(VtConn * z)189 vtrecv(VtConn *z)
190 {
191 	Packet *p;
192 	Queue *q;
193 
194 	qlock(&z->lk);
195 	if(z->state != VtStateConnected){
196 		werrstr("not connected");
197 		qunlock(&z->lk);
198 		return nil;
199 	}
200 	if(z->readq){
201 		q = _vtqincref(z->readq);
202 		qunlock(&z->lk);
203 		p = _vtqrecv(q);
204 		_vtqdecref(q);
205 		return p;
206 	}
207 
208 	qlock(&z->inlk);
209 	qunlock(&z->lk);
210 	p = _vtrecv(z);
211 	qunlock(&z->inlk);
212 	if(!p)
213 		vthangup(z);
214 	return p;
215 }
216 
217 int
vtsend(VtConn * z,Packet * p)218 vtsend(VtConn *z, Packet *p)
219 {
220 	Queue *q;
221 
222 	qlock(&z->lk);
223 	if(z->state != VtStateConnected){
224 		packetfree(p);
225 		werrstr("not connected");
226 		qunlock(&z->lk);
227 		return -1;
228 	}
229 	if(z->writeq){
230 		q = _vtqincref(z->writeq);
231 		qunlock(&z->lk);
232 		if(_vtqsend(q, p) < 0){
233 			_vtqdecref(q);
234 			packetfree(p);
235 			return -1;
236 		}
237 		_vtqdecref(q);
238 		return 0;
239 	}
240 
241 	qlock(&z->outlk);
242 	qunlock(&z->lk);
243 	if(_vtsend(z, p) < 0){
244 		qunlock(&z->outlk);
245 		vthangup(z);
246 		return -1;
247 	}
248 	qunlock(&z->outlk);
249 	return 0;
250 }
251 
252