xref: /plan9/sys/src/libsunrpc/client.c (revision 6ff5e9135082ce281d25c68a5544eef8249d480c)
1 /*
2  * Sun RPC client.
3  */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
8 
9 typedef struct Out Out;
10 struct Out
11 {
12 	char err[ERRMAX];	/* error string */
13 	Channel *creply;	/* send to finish rpc */
14 	uchar *p;			/* pending request packet */
15 	int n;				/* size of request */
16 	ulong tag;			/* flush tag of pending request */
17 	ulong xid;			/* xid of pending request */
18 	ulong st;			/* first send time */
19 	ulong t;			/* resend time */
20 	int nresend;		/* number of resends */
21 	SunRpc rpc;		/* response rpc */
22 };
23 
24 static void
udpThread(void * v)25 udpThread(void *v)
26 {
27 	uchar *p, *buf;
28 	Ioproc *io;
29 	int n;
30 	SunClient *cli;
31 	enum { BufSize = 65536 };
32 
33 	cli = v;
34 	buf = emalloc(BufSize);
35 	io = ioproc();
36 	p = nil;
37 	for(;;){
38 		n = ioread(io, cli->fd, buf, BufSize);
39 		if(n <= 0)
40 			break;
41 		p = emalloc(4+n);
42 		memmove(p+4, buf, n);
43 		p[0] = n>>24;
44 		p[1] = n>>16;
45 		p[2] = n>>8;
46 		p[3] = n;
47 		if(sendp(cli->readchan, p) == 0)
48 			break;
49 		p = nil;
50 	}
51 	free(p);
52 	closeioproc(io);
53 	while(send(cli->dying, nil) == -1)
54 		;
55 }
56 
57 static void
netThread(void * v)58 netThread(void *v)
59 {
60 	uchar *p, buf[4];
61 	Ioproc *io;
62 	uint n, tot;
63 	int done;
64 	SunClient *cli;
65 
66 	cli = v;
67 	io = ioproc();
68 	tot = 0;
69 	p = nil;
70 	for(;;){
71 		n = ioreadn(io, cli->fd, buf, 4);
72 		if(n != 4)
73 			break;
74 		n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75 		if(cli->chatty)
76 			fprint(2, "%.8ux...", n);
77 		done = n&0x80000000;
78 		n &= ~0x80000000;
79 		if(tot == 0){
80 			p = emalloc(4+n);
81 			tot = 4;
82 		}else
83 			p = erealloc(p, tot+n);
84 		if(ioreadn(io, cli->fd, p+tot, n) != n)
85 			break;
86 		tot += n;
87 		if(done){
88 			p[0] = tot>>24;
89 			p[1] = tot>>16;
90 			p[2] = tot>>8;
91 			p[3] = tot;
92 			if(sendp(cli->readchan, p) == 0)
93 				break;
94 			p = nil;
95 			tot = 0;
96 		}
97 	}
98 	free(p);
99 	closeioproc(io);
100 	while(send(cli->dying, 0) == -1)
101 		;
102 }
103 
104 static void
timerThread(void * v)105 timerThread(void *v)
106 {
107 	Ioproc *io;
108 	SunClient *cli;
109 
110 	cli = v;
111 	io = ioproc();
112 	for(;;){
113 		if(iosleep(io, 200) < 0)
114 			break;
115 		if(sendul(cli->timerchan, 0) == 0)
116 			break;
117 	}
118 	closeioproc(io);
119 	while(send(cli->dying, 0) == -1)
120 		;
121 }
122 
123 static ulong
msec(void)124 msec(void)
125 {
126 	return nsec()/1000000;
127 }
128 
129 static ulong
twait(ulong rtt,int nresend)130 twait(ulong rtt, int nresend)
131 {
132 	ulong t;
133 
134 	t = rtt;
135 	if(nresend <= 1)
136 		{}
137 	else if(nresend <= 3)
138 		t *= 2;
139 	else if(nresend <= 18)
140 		t <<= nresend-2;
141 	else
142 		t = 60*1000;
143 	if(t > 60*1000)
144 		t = 60*1000;
145 
146 	return t;
147 }
148 
149 static void
rpcMuxThread(void * v)150 rpcMuxThread(void *v)
151 {
152 	uchar *buf, *p, *ep;
153 	int i, n, nout, mout;
154 	ulong t, xidgen, tag;
155 	Alt a[5];
156 	Out *o, **out;
157 	SunRpc rpc;
158 	SunClient *cli;
159 
160 	cli = v;
161 	mout = 16;
162 	nout = 0;
163 	out = emalloc(mout*sizeof(out[0]));
164 	xidgen = truerand();
165 
166 	a[0].op = CHANRCV;
167 	a[0].c = cli->rpcchan;
168 	a[0].v = &o;
169 	a[1].op = CHANNOP;
170 	a[1].c = cli->timerchan;
171 	a[1].v = nil;
172 	a[2].op = CHANRCV;
173 	a[2].c = cli->flushchan;
174 	a[2].v = &tag;
175 	a[3].op = CHANRCV;
176 	a[3].c = cli->readchan;
177 	a[3].v = &buf;
178 	a[4].op = CHANEND;
179 
180 	for(;;){
181 		switch(alt(a)){
182 		case 0:	/* o = <-rpcchan */
183 			if(o == nil)
184 				goto Done;
185 			cli->nsend++;
186 			/* set xid */
187 			o->xid = ++xidgen;
188 			if(cli->needcount)
189 				p = o->p+4;
190 			else
191 				p = o->p;
192 			p[0] = xidgen>>24;
193 			p[1] = xidgen>>16;
194 			p[2] = xidgen>>8;
195 			p[3] = xidgen;
196 			if(write(cli->fd, o->p, o->n) != o->n){
197 				free(o->p);
198 				o->p = nil;
199 				snprint(o->err, sizeof o->err, "write: %r");
200 				sendp(o->creply, 0);
201 				break;
202 			}
203 			if(nout >= mout){
204 				mout *= 2;
205 				out = erealloc(out, mout*sizeof(out[0]));
206 			}
207 			o->st = msec();
208 			o->nresend = 0;
209 			o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211 			out[nout++] = o;
212 			a[1].op = CHANRCV;
213 			break;
214 
215 		case 1:	/* <-timerchan */
216 			t = msec();
217 			for(i=0; i<nout; i++){
218 				o = out[i];
219 				if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221 					if(cli->maxwait && t - o->st >= cli->maxwait){
222 						free(o->p);
223 						o->p = nil;
224 						strcpy(o->err, "timeout");
225 						sendp(o->creply, 0);
226 						out[i--] = out[--nout];
227 						continue;
228 					}
229 					cli->nresend++;
230 					o->nresend++;
231 					o->t = t + twait(cli->rtt.avg, o->nresend);
232 					if(write(cli->fd, o->p, o->n) != o->n){
233 						free(o->p);
234 						o->p = nil;
235 						snprint(o->err, sizeof o->err, "rewrite: %r");
236 						sendp(o->creply, 0);
237 						out[i--] = out[--nout];
238 						continue;
239 					}
240 				}
241 			}
242 			/* stop ticking if no work; rpcchan will turn it back on */
243 			if(nout == 0)
244 				a[1].op = CHANNOP;
245 			break;
246 
247 		case 2:	/* tag = <-flushchan */
248 			for(i=0; i<nout; i++){
249 				o = out[i];
250 				if(o->tag == tag){
251 					out[i--] = out[--nout];
252 					strcpy(o->err, "flushed");
253 					free(o->p);
254 					o->p = nil;
255 					sendp(o->creply, 0);
256 				}
257 			}
258 			break;
259 
260 		case 3:	/* buf = <-readchan */
261 			p = buf;
262 			n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263 			p += 4;
264 			ep = p+n;
265 			if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266 				fprint(2, "in: %.*H unpack failed\n", n, buf+4);
267 				free(buf);
268 				break;
269 			}
270 			if(cli->chatty)
271 				fprint(2, "in: %B\n", &rpc);
272 			if(rpc.iscall){
273 				fprint(2, "did not get reply\n");
274 				free(buf);
275 				break;
276 			}
277 			o = nil;
278 			for(i=0; i<nout; i++){
279 				o = out[i];
280 				if(o->xid == rpc.xid)
281 					break;
282 			}
283 			if(i==nout){
284 				if(cli->chatty) fprint(2, "did not find waiting request\n");
285 				free(buf);
286 				break;
287 			}
288 			out[i] = out[--nout];
289 			free(o->p);
290 			o->p = nil;
291 			if(rpc.status == SunSuccess){
292 				o->p = buf;
293 				o->rpc = rpc;
294 			}else{
295 				o->p = nil;
296 				free(buf);
297 				sunErrstr(rpc.status);
298 				rerrstr(o->err, sizeof o->err);
299 			}
300 			sendp(o->creply, 0);
301 			break;
302 		}
303 	}
304 Done:
305 	free(out);
306 	sendp(cli->dying, 0);
307 }
308 
309 SunClient*
sunDial(char * address)310 sunDial(char *address)
311 {
312 	int fd;
313 	SunClient *cli;
314 
315 	if((fd = dial(address, 0, 0, 0)) < 0)
316 		return nil;
317 
318 	cli = emalloc(sizeof(SunClient));
319 	cli->fd = fd;
320 	cli->maxwait = 15000;
321 	cli->rtt.avg = 1000;
322 	cli->dying = chancreate(sizeof(void*), 0);
323 	cli->rpcchan = chancreate(sizeof(Out*), 0);
324 	cli->timerchan = chancreate(sizeof(ulong), 0);
325 	cli->flushchan = chancreate(sizeof(ulong), 0);
326 	cli->readchan = chancreate(sizeof(uchar*), 0);
327 	if(strstr(address, "udp!")){
328 		cli->needcount = 0;
329 		cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330 		cli->timertid = threadcreate(timerThread, cli, SunStackSize);
331 	}else{
332 		cli->needcount = 1;
333 		cli->nettid = threadcreate(netThread, cli, SunStackSize);
334 		/* assume reliable: don't need timer */
335 		/* BUG: netThread should know how to redial */
336 	}
337 	threadcreate(rpcMuxThread, cli, SunStackSize);
338 
339 	return cli;
340 }
341 
342 void
sunClientClose(SunClient * cli)343 sunClientClose(SunClient *cli)
344 {
345 	int n;
346 
347 	/*
348 	 * Threadints get you out of any stuck system calls
349 	 * or thread rendezvouses, but do nothing if the thread
350 	 * is in the ready state.  Keep interrupting until it takes.
351 	 */
352 	n = 0;
353 	if(!cli->timertid)
354 		n++;
355 	while(n < 2){
356 		threadint(cli->nettid);
357 		if(cli->timertid)
358 			threadint(cli->timertid);
359 		yield();
360 		while(nbrecv(cli->dying, nil) == 1)
361 			n++;
362 	}
363 
364 	sendp(cli->rpcchan, 0);
365 	recvp(cli->dying);
366 
367 	/* everyone's gone: clean up */
368 	close(cli->fd);
369 	chanfree(cli->flushchan);
370 	chanfree(cli->readchan);
371 	chanfree(cli->timerchan);
372 	free(cli);
373 }
374 
375 void
sunClientFlushRpc(SunClient * cli,ulong tag)376 sunClientFlushRpc(SunClient *cli, ulong tag)
377 {
378 	sendul(cli->flushchan, tag);
379 }
380 
381 void
sunClientProg(SunClient * cli,SunProg * p)382 sunClientProg(SunClient *cli, SunProg *p)
383 {
384 	if(cli->nprog%16 == 0)
385 		cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
386 	cli->prog[cli->nprog++] = p;
387 }
388 
389 int
sunClientRpc(SunClient * cli,ulong tag,SunCall * tx,SunCall * rx,uchar ** tofree)390 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
391 {
392 	uchar *bp, *p, *ep;
393 	int i, n1, n2, n, nn;
394 	Out o;
395 	SunProg *prog;
396 	SunStatus ok;
397 
398 	for(i=0; i<cli->nprog; i++)
399 		if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
400 			break;
401 	if(i==cli->nprog){
402 		werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
403 		return -1;
404 	}
405 	prog = cli->prog[i];
406 
407 	if(cli->chatty){
408 		fprint(2, "out: %B\n", &tx->rpc);
409 		fprint(2, "\t%C\n", tx);
410 	}
411 
412 	n1 = sunRpcSize(&tx->rpc);
413 	n2 = sunCallSize(prog, tx);
414 
415 	n = n1+n2;
416 	if(cli->needcount)
417 		n += 4;
418 
419 	bp = emalloc(n);
420 	ep = bp+n;
421 	p = bp;
422 	if(cli->needcount){
423 		nn = n-4;
424 		p[0] = (nn>>24)|0x80;
425 		p[1] = nn>>16;
426 		p[2] = nn>>8;
427 		p[3] = nn;
428 		p += 4;
429 	}
430 	if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
431 	|| (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
432 		sunErrstr(ok);
433 		free(bp);
434 		return -1;
435 	}
436 	if(p != ep){
437 		werrstr("rpc: packet size mismatch");
438 		free(bp);
439 		return -1;
440 	}
441 
442 	memset(&o, 0, sizeof o);
443 	o.creply = chancreate(sizeof(void*), 0);
444 	o.tag = tag;
445 	o.p = bp;
446 	o.n = n;
447 
448 	sendp(cli->rpcchan, &o);
449 	recvp(o.creply);
450 	chanfree(o.creply);
451 
452 	if(o.p == nil){
453 		werrstr("%s", o.err);
454 		return -1;
455 	}
456 
457 	p = o.rpc.data;
458 	ep = p+o.rpc.ndata;
459 	rx->rpc = o.rpc;
460 	rx->rpc.proc = tx->rpc.proc;
461 	rx->rpc.prog = tx->rpc.prog;
462 	rx->rpc.vers = tx->rpc.vers;
463 	rx->type = (rx->rpc.proc<<1)|1;
464 	if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
465 		sunErrstr(ok);
466 		werrstr("unpack: %r");
467 		free(o.p);
468 		return -1;
469 	}
470 
471 	if(cli->chatty){
472 		fprint(2, "in: %B\n", &rx->rpc);
473 		fprint(2, "in:\t%C\n", rx);
474 	}
475 
476 	if(tofree)
477 		*tofree = o.p;
478 	else
479 		free(o.p);
480 
481 	return 0;
482 }
483