xref: /plan9/sys/src/libsunrpc/client.c (revision 0b459c2cb92b7c9d88818e9a2f72e678e5bc4553)
1 /*
2  * Sun RPC client.
3  */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
8 
9 typedef struct Out Out;
10 struct Out
11 {
12 	char err[ERRMAX];	/* error string */
13 	Channel *creply;	/* send to finish rpc */
14 	uchar *p;			/* pending request packet */
15 	int n;				/* size of request */
16 	ulong tag;			/* flush tag of pending request */
17 	ulong xid;			/* xid of pending request */
18 	ulong st;			/* first send time */
19 	ulong t;			/* resend time */
20 	int nresend;		/* number of resends */
21 	SunRpc rpc;		/* response rpc */
22 };
23 
24 static void
25 udpThread(void *v)
26 {
27 	uchar *p, *buf;
28 	Ioproc *io;
29 	int n;
30 	SunClient *cli;
31 	enum { BufSize = 65536 };
32 
33 	cli = v;
34 	buf = emalloc(BufSize);
35 	io = ioproc();
36 	p = nil;
37 	for(;;){
38 		n = ioread(io, cli->fd, buf, BufSize);
39 		if(n <= 0)
40 			break;
41 		p = emalloc(4+n);
42 		memmove(p+4, buf, n);
43 		p[0] = n>>24;
44 		p[1] = n>>16;
45 		p[2] = n>>8;
46 		p[3] = n;
47 		if(sendp(cli->readchan, p) == 0)
48 			break;
49 		p = nil;
50 	}
51 	free(p);
52 	closeioproc(io);
53 	while(send(cli->dying, nil) == -1)
54 		;
55 }
56 
57 static void
58 netThread(void *v)
59 {
60 	uchar *p, buf[4];
61 	Ioproc *io;
62 	uint n, tot;
63 	int done;
64 	SunClient *cli;
65 
66 	cli = v;
67 	io = ioproc();
68 	tot = 0;
69 	p = nil;
70 	for(;;){
71 		n = ioreadn(io, cli->fd, buf, 4);
72 		if(n != 4)
73 			break;
74 		n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75 		if(cli->chatty)
76 			fprint(2, "%.8ux...", n);
77 		done = n&0x80000000;
78 		n &= ~0x80000000;
79 		if(tot == 0){
80 			p = emalloc(4+n);
81 			tot = 4;
82 		}else
83 			p = erealloc(p, tot+n);
84 		if(ioreadn(io, cli->fd, p+tot, n) != n)
85 			break;
86 		tot += n;
87 		if(done){
88 			p[0] = tot>>24;
89 			p[1] = tot>>16;
90 			p[2] = tot>>8;
91 			p[3] = tot;
92 			if(sendp(cli->readchan, p) == 0)
93 				break;
94 			p = nil;
95 			tot = 0;
96 		}
97 	}
98 	free(p);
99 	closeioproc(io);
100 	while(send(cli->dying, 0) == -1)
101 		;
102 }
103 
104 static void
105 timerThread(void *v)
106 {
107 	Ioproc *io;
108 	SunClient *cli;
109 
110 	cli = v;
111 	io = ioproc();
112 	for(;;){
113 		if(iosleep(io, 200) < 0)
114 			break;
115 		if(sendul(cli->timerchan, 0) == 0)
116 			break;
117 	}
118 	closeioproc(io);
119 	while(send(cli->dying, 0) == -1)
120 		;
121 }
122 
123 static ulong
124 msec(void)
125 {
126 	return nsec()/1000000;
127 }
128 
129 static ulong
130 twait(ulong rtt, int nresend)
131 {
132 	ulong t;
133 
134 	t = rtt;
135 	if(nresend <= 1)
136 		{}
137 	else if(nresend <= 3)
138 		t *= 2;
139 	else if(nresend <= 18)
140 		t <<= nresend-2;
141 	else
142 		t = 60*1000;
143 	if(t > 60*1000)
144 		t = 60*1000;
145 
146 	return t;
147 }
148 
149 static void
150 rpcMuxThread(void *v)
151 {
152 	uchar *buf, *p, *ep;
153 	int i, n, nout, mout;
154 	ulong t, xidgen, tag;
155 	Alt a[5];
156 	Out *o, **out;
157 	SunRpc rpc;
158 	SunClient *cli;
159 
160 	cli = v;
161 	mout = 16;
162 	nout = 0;
163 	out = emalloc(mout*sizeof(out[0]));
164 	xidgen = truerand();
165 
166 	a[0].op = CHANRCV;
167 	a[0].c = cli->rpcchan;
168 	a[0].v = &o;
169 	a[1].op = CHANNOP;
170 	a[1].c = cli->timerchan;
171 	a[1].v = nil;
172 	a[2].op = CHANRCV;
173 	a[2].c = cli->flushchan;
174 	a[2].v = &tag;
175 	a[3].op = CHANRCV;
176 	a[3].c = cli->readchan;
177 	a[3].v = &buf;
178 	a[4].op = CHANEND;
179 
180 	for(;;){
181 		switch(alt(a)){
182 		case 0:	/* rpcchan */
183 			if(o == nil)
184 				goto Done;
185 			cli->nsend++;
186 			/* set xid */
187 			o->xid = ++xidgen;
188 			if(cli->needcount)
189 				p = o->p+4;
190 			else
191 				p = o->p;
192 			p[0] = xidgen>>24;
193 			p[1] = xidgen>>16;
194 			p[2] = xidgen>>8;
195 			p[3] = xidgen;
196 			if(write(cli->fd, o->p, o->n) != o->n){
197 				free(o->p);
198 				o->p = nil;
199 				snprint(o->err, sizeof o->err, "write: %r");
200 				sendp(o->creply, 0);
201 				break;
202 			}
203 			if(nout >= mout){
204 				mout *= 2;
205 				out = erealloc(out, mout*sizeof(out[0]));
206 			}
207 			o->st = msec();
208 			o->nresend = 0;
209 			o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211 			out[nout++] = o;
212 			a[1].op = CHANRCV;
213 			break;
214 
215 		case 1:	/* timerchan */
216 			t = msec();
217 			for(i=0; i<nout; i++){
218 				o = out[i];
219 				if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221 					if(cli->maxwait && t - o->st >= cli->maxwait){
222 						free(o->p);
223 						o->p = nil;
224 						strcpy(o->err, "timeout");
225 						sendp(o->creply, 0);
226 						out[i--] = out[--nout];
227 						continue;
228 					}
229 					cli->nresend++;
230 					o->nresend++;
231 					o->t = t + twait(cli->rtt.avg, o->nresend);
232 					if(write(cli->fd, o->p, o->n) != o->n){
233 						free(o->p);
234 						o->p = nil;
235 						snprint(o->err, sizeof o->err, "rewrite: %r");
236 						sendp(o->creply, 0);
237 						out[i--] = out[--nout];
238 						continue;
239 					}
240 				}
241 			}
242 			/* stop ticking if no work; rpcchan will turn it back on */
243 			if(nout == 0)
244 				a[1].op = CHANNOP;
245 			break;
246 
247 		case 2:	/* flushchan */
248 			for(i=0; i<nout; i++){
249 				o = out[i];
250 				if(o->tag == tag){
251 					out[i--] = out[--nout];
252 					strcpy(o->err, "flushed");
253 					free(o->p);
254 					o->p = nil;
255 					sendp(o->creply, 0);
256 				}
257 			}
258 			break;
259 
260 		case 3:	/* readchan */
261 			p = buf;
262 			n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263 			p += 4;
264 			ep = p+n;
265 			if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266 				fprint(2, "in: %.*H unpack failed\n", n, buf+4);
267 				free(buf);
268 				break;
269 			}
270 			if(cli->chatty)
271 				fprint(2, "in: %B\n", &rpc);
272 			if(rpc.iscall){
273 				fprint(2, "did not get reply\n");
274 				free(buf);
275 				break;
276 			}
277 			for(i=0; i<nout; i++)
278 				if(o->xid == rpc.xid)
279 					break;
280 			if(i==nout){
281 				if(cli->chatty) fprint(2, "did not find waiting request\n");
282 				free(buf);
283 				break;
284 			}
285 			out[i] = out[--nout];
286 			free(o->p);
287 			o->p = nil;
288 			if(rpc.status == SunSuccess){
289 				o->p = buf;
290 				o->rpc = rpc;
291 			}else{
292 				o->p = nil;
293 				free(buf);
294 				sunErrstr(rpc.status);
295 				rerrstr(o->err, sizeof o->err);
296 			}
297 			sendp(o->creply, 0);
298 			break;
299 		}
300 	}
301 Done:
302 	free(out);
303 	sendp(cli->dying, 0);
304 }
305 
306 SunClient*
307 sunDial(char *address)
308 {
309 	int fd;
310 	SunClient *cli;
311 
312 	if((fd = dial(address, 0, 0, 0)) < 0)
313 		return nil;
314 
315 	cli = emalloc(sizeof(SunClient));
316 	cli->fd = fd;
317 	cli->maxwait = 15000;
318 	cli->rtt.avg = 1000;
319 	cli->dying = chancreate(sizeof(void*), 0);
320 	cli->rpcchan = chancreate(sizeof(Out*), 0);
321 	cli->timerchan = chancreate(sizeof(ulong), 0);
322 	cli->flushchan = chancreate(sizeof(ulong), 0);
323 	cli->readchan = chancreate(sizeof(uchar*), 0);
324 	if(strstr(address, "udp!")){
325 		cli->needcount = 0;
326 		cli->nettid = threadcreate(udpThread, cli, SunStackSize);
327 		cli->timertid = threadcreate(timerThread, cli, SunStackSize);
328 	}else{
329 		cli->needcount = 1;
330 		cli->nettid = threadcreate(netThread, cli, SunStackSize);
331 		/* assume reliable: don't need timer */
332 		/* BUG: netThread should know how to redial */
333 	}
334 	threadcreate(rpcMuxThread, cli, SunStackSize);
335 
336 	return cli;
337 }
338 
339 void
340 sunClientClose(SunClient *cli)
341 {
342 	int n;
343 
344 	/*
345 	 * Threadints get you out of any stuck system calls
346 	 * or thread rendezvouses, but do nothing if the thread
347 	 * is in the ready state.  Keep interrupting until it takes.
348 	 */
349 	n = 0;
350 	if(!cli->timertid)
351 		n++;
352 	while(n < 2){
353 		threadint(cli->nettid);
354 		if(cli->timertid)
355 			threadint(cli->timertid);
356 		yield();
357 		while(nbrecv(cli->dying, nil) == 1)
358 			n++;
359 	}
360 
361 	sendp(cli->rpcchan, 0);
362 	recvp(cli->dying);
363 
364 	/* everyone's gone: clean up */
365 	close(cli->fd);
366 	chanfree(cli->flushchan);
367 	chanfree(cli->readchan);
368 	chanfree(cli->timerchan);
369 	free(cli);
370 }
371 
372 void
373 sunClientFlushRpc(SunClient *cli, ulong tag)
374 {
375 	sendul(cli->flushchan, tag);
376 }
377 
378 void
379 sunClientProg(SunClient *cli, SunProg *p)
380 {
381 	if(cli->nprog%16 == 0)
382 		cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
383 	cli->prog[cli->nprog++] = p;
384 }
385 
386 int
387 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
388 {
389 	uchar *bp, *p, *ep;
390 	int i, n1, n2, n, nn;
391 	Out o;
392 	SunProg *prog;
393 	SunStatus ok;
394 
395 	for(i=0; i<cli->nprog; i++)
396 		if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
397 			break;
398 	if(i==cli->nprog){
399 		werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
400 		return -1;
401 	}
402 	prog = cli->prog[i];
403 
404 	if(cli->chatty){
405 		fprint(2, "out: %B\n", &tx->rpc);
406 		fprint(2, "\t%C\n", tx);
407 	}
408 
409 	n1 = sunRpcSize(&tx->rpc);
410 	n2 = sunCallSize(prog, tx);
411 
412 	n = n1+n2;
413 	if(cli->needcount)
414 		n += 4;
415 
416 	bp = emalloc(n);
417 	ep = bp+n;
418 	p = bp;
419 	if(cli->needcount){
420 		nn = n-4;
421 		p[0] = (nn>>24)|0x80;
422 		p[1] = nn>>16;
423 		p[2] = nn>>8;
424 		p[3] = nn;
425 		p += 4;
426 	}
427 	if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
428 	|| (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
429 		sunErrstr(ok);
430 		free(bp);
431 		return -1;
432 	}
433 	if(p != ep){
434 		werrstr("rpc: packet size mismatch");
435 		free(bp);
436 		return -1;
437 	}
438 
439 	memset(&o, 0, sizeof o);
440 	o.creply = chancreate(sizeof(void*), 0);
441 	o.tag = tag;
442 	o.p = bp;
443 	o.n = n;
444 
445 	sendp(cli->rpcchan, &o);
446 	recvp(o.creply);
447 	chanfree(o.creply);
448 
449 	if(o.p == nil){
450 		werrstr("%s", o.err);
451 		return -1;
452 	}
453 
454 	p = o.rpc.data;
455 	ep = p+o.rpc.ndata;
456 	rx->rpc = o.rpc;
457 	rx->rpc.proc = tx->rpc.proc;
458 	rx->rpc.prog = tx->rpc.prog;
459 	rx->rpc.vers = tx->rpc.vers;
460 	rx->type = (rx->rpc.proc<<1)|1;
461 	if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
462 		sunErrstr(ok);
463 		werrstr("unpack: %r");
464 		free(o.p);
465 		return -1;
466 	}
467 
468 	if(cli->chatty){
469 		fprint(2, "in: %B\n", &rx->rpc);
470 		fprint(2, "in:\t%C\n", rx);
471 	}
472 
473 	if(tofree)
474 		*tofree = o.p;
475 	else
476 		free(o.p);
477 
478 	return 0;
479 }
480