1 /*
2 * Sun RPC client.
3 */
4 #include <u.h>
5 #include <libc.h>
6 #include <thread.h>
7 #include <sunrpc.h>
8
9 typedef struct Out Out;
10 struct Out
11 {
12 char err[ERRMAX]; /* error string */
13 Channel *creply; /* send to finish rpc */
14 uchar *p; /* pending request packet */
15 int n; /* size of request */
16 ulong tag; /* flush tag of pending request */
17 ulong xid; /* xid of pending request */
18 ulong st; /* first send time */
19 ulong t; /* resend time */
20 int nresend; /* number of resends */
21 SunRpc rpc; /* response rpc */
22 };
23
24 static void
udpThread(void * v)25 udpThread(void *v)
26 {
27 uchar *p, *buf;
28 Ioproc *io;
29 int n;
30 SunClient *cli;
31 enum { BufSize = 65536 };
32
33 cli = v;
34 buf = emalloc(BufSize);
35 io = ioproc();
36 p = nil;
37 for(;;){
38 n = ioread(io, cli->fd, buf, BufSize);
39 if(n <= 0)
40 break;
41 p = emalloc(4+n);
42 memmove(p+4, buf, n);
43 p[0] = n>>24;
44 p[1] = n>>16;
45 p[2] = n>>8;
46 p[3] = n;
47 if(sendp(cli->readchan, p) == 0)
48 break;
49 p = nil;
50 }
51 free(p);
52 closeioproc(io);
53 while(send(cli->dying, nil) == -1)
54 ;
55 }
56
57 static void
netThread(void * v)58 netThread(void *v)
59 {
60 uchar *p, buf[4];
61 Ioproc *io;
62 uint n, tot;
63 int done;
64 SunClient *cli;
65
66 cli = v;
67 io = ioproc();
68 tot = 0;
69 p = nil;
70 for(;;){
71 n = ioreadn(io, cli->fd, buf, 4);
72 if(n != 4)
73 break;
74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
75 if(cli->chatty)
76 fprint(2, "%.8ux...", n);
77 done = n&0x80000000;
78 n &= ~0x80000000;
79 if(tot == 0){
80 p = emalloc(4+n);
81 tot = 4;
82 }else
83 p = erealloc(p, tot+n);
84 if(ioreadn(io, cli->fd, p+tot, n) != n)
85 break;
86 tot += n;
87 if(done){
88 p[0] = tot>>24;
89 p[1] = tot>>16;
90 p[2] = tot>>8;
91 p[3] = tot;
92 if(sendp(cli->readchan, p) == 0)
93 break;
94 p = nil;
95 tot = 0;
96 }
97 }
98 free(p);
99 closeioproc(io);
100 while(send(cli->dying, 0) == -1)
101 ;
102 }
103
104 static void
timerThread(void * v)105 timerThread(void *v)
106 {
107 Ioproc *io;
108 SunClient *cli;
109
110 cli = v;
111 io = ioproc();
112 for(;;){
113 if(iosleep(io, 200) < 0)
114 break;
115 if(sendul(cli->timerchan, 0) == 0)
116 break;
117 }
118 closeioproc(io);
119 while(send(cli->dying, 0) == -1)
120 ;
121 }
122
123 static ulong
msec(void)124 msec(void)
125 {
126 return nsec()/1000000;
127 }
128
129 static ulong
twait(ulong rtt,int nresend)130 twait(ulong rtt, int nresend)
131 {
132 ulong t;
133
134 t = rtt;
135 if(nresend <= 1)
136 {}
137 else if(nresend <= 3)
138 t *= 2;
139 else if(nresend <= 18)
140 t <<= nresend-2;
141 else
142 t = 60*1000;
143 if(t > 60*1000)
144 t = 60*1000;
145
146 return t;
147 }
148
149 static void
rpcMuxThread(void * v)150 rpcMuxThread(void *v)
151 {
152 uchar *buf, *p, *ep;
153 int i, n, nout, mout;
154 ulong t, xidgen, tag;
155 Alt a[5];
156 Out *o, **out;
157 SunRpc rpc;
158 SunClient *cli;
159
160 cli = v;
161 mout = 16;
162 nout = 0;
163 out = emalloc(mout*sizeof(out[0]));
164 xidgen = truerand();
165
166 a[0].op = CHANRCV;
167 a[0].c = cli->rpcchan;
168 a[0].v = &o;
169 a[1].op = CHANNOP;
170 a[1].c = cli->timerchan;
171 a[1].v = nil;
172 a[2].op = CHANRCV;
173 a[2].c = cli->flushchan;
174 a[2].v = &tag;
175 a[3].op = CHANRCV;
176 a[3].c = cli->readchan;
177 a[3].v = &buf;
178 a[4].op = CHANEND;
179
180 for(;;){
181 switch(alt(a)){
182 case 0: /* o = <-rpcchan */
183 if(o == nil)
184 goto Done;
185 cli->nsend++;
186 /* set xid */
187 o->xid = ++xidgen;
188 if(cli->needcount)
189 p = o->p+4;
190 else
191 p = o->p;
192 p[0] = xidgen>>24;
193 p[1] = xidgen>>16;
194 p[2] = xidgen>>8;
195 p[3] = xidgen;
196 if(write(cli->fd, o->p, o->n) != o->n){
197 free(o->p);
198 o->p = nil;
199 snprint(o->err, sizeof o->err, "write: %r");
200 sendp(o->creply, 0);
201 break;
202 }
203 if(nout >= mout){
204 mout *= 2;
205 out = erealloc(out, mout*sizeof(out[0]));
206 }
207 o->st = msec();
208 o->nresend = 0;
209 o->t = o->st + twait(cli->rtt.avg, 0);
210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
211 out[nout++] = o;
212 a[1].op = CHANRCV;
213 break;
214
215 case 1: /* <-timerchan */
216 t = msec();
217 for(i=0; i<nout; i++){
218 o = out[i];
219 if((int)(t - o->t) > 0){
220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
221 if(cli->maxwait && t - o->st >= cli->maxwait){
222 free(o->p);
223 o->p = nil;
224 strcpy(o->err, "timeout");
225 sendp(o->creply, 0);
226 out[i--] = out[--nout];
227 continue;
228 }
229 cli->nresend++;
230 o->nresend++;
231 o->t = t + twait(cli->rtt.avg, o->nresend);
232 if(write(cli->fd, o->p, o->n) != o->n){
233 free(o->p);
234 o->p = nil;
235 snprint(o->err, sizeof o->err, "rewrite: %r");
236 sendp(o->creply, 0);
237 out[i--] = out[--nout];
238 continue;
239 }
240 }
241 }
242 /* stop ticking if no work; rpcchan will turn it back on */
243 if(nout == 0)
244 a[1].op = CHANNOP;
245 break;
246
247 case 2: /* tag = <-flushchan */
248 for(i=0; i<nout; i++){
249 o = out[i];
250 if(o->tag == tag){
251 out[i--] = out[--nout];
252 strcpy(o->err, "flushed");
253 free(o->p);
254 o->p = nil;
255 sendp(o->creply, 0);
256 }
257 }
258 break;
259
260 case 3: /* buf = <-readchan */
261 p = buf;
262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
263 p += 4;
264 ep = p+n;
265 if(sunRpcUnpack(p, ep, &p, &rpc) < 0){
266 fprint(2, "in: %.*H unpack failed\n", n, buf+4);
267 free(buf);
268 break;
269 }
270 if(cli->chatty)
271 fprint(2, "in: %B\n", &rpc);
272 if(rpc.iscall){
273 fprint(2, "did not get reply\n");
274 free(buf);
275 break;
276 }
277 o = nil;
278 for(i=0; i<nout; i++){
279 o = out[i];
280 if(o->xid == rpc.xid)
281 break;
282 }
283 if(i==nout){
284 if(cli->chatty) fprint(2, "did not find waiting request\n");
285 free(buf);
286 break;
287 }
288 out[i] = out[--nout];
289 free(o->p);
290 o->p = nil;
291 if(rpc.status == SunSuccess){
292 o->p = buf;
293 o->rpc = rpc;
294 }else{
295 o->p = nil;
296 free(buf);
297 sunErrstr(rpc.status);
298 rerrstr(o->err, sizeof o->err);
299 }
300 sendp(o->creply, 0);
301 break;
302 }
303 }
304 Done:
305 free(out);
306 sendp(cli->dying, 0);
307 }
308
309 SunClient*
sunDial(char * address)310 sunDial(char *address)
311 {
312 int fd;
313 SunClient *cli;
314
315 if((fd = dial(address, 0, 0, 0)) < 0)
316 return nil;
317
318 cli = emalloc(sizeof(SunClient));
319 cli->fd = fd;
320 cli->maxwait = 15000;
321 cli->rtt.avg = 1000;
322 cli->dying = chancreate(sizeof(void*), 0);
323 cli->rpcchan = chancreate(sizeof(Out*), 0);
324 cli->timerchan = chancreate(sizeof(ulong), 0);
325 cli->flushchan = chancreate(sizeof(ulong), 0);
326 cli->readchan = chancreate(sizeof(uchar*), 0);
327 if(strstr(address, "udp!")){
328 cli->needcount = 0;
329 cli->nettid = threadcreate(udpThread, cli, SunStackSize);
330 cli->timertid = threadcreate(timerThread, cli, SunStackSize);
331 }else{
332 cli->needcount = 1;
333 cli->nettid = threadcreate(netThread, cli, SunStackSize);
334 /* assume reliable: don't need timer */
335 /* BUG: netThread should know how to redial */
336 }
337 threadcreate(rpcMuxThread, cli, SunStackSize);
338
339 return cli;
340 }
341
342 void
sunClientClose(SunClient * cli)343 sunClientClose(SunClient *cli)
344 {
345 int n;
346
347 /*
348 * Threadints get you out of any stuck system calls
349 * or thread rendezvouses, but do nothing if the thread
350 * is in the ready state. Keep interrupting until it takes.
351 */
352 n = 0;
353 if(!cli->timertid)
354 n++;
355 while(n < 2){
356 threadint(cli->nettid);
357 if(cli->timertid)
358 threadint(cli->timertid);
359 yield();
360 while(nbrecv(cli->dying, nil) == 1)
361 n++;
362 }
363
364 sendp(cli->rpcchan, 0);
365 recvp(cli->dying);
366
367 /* everyone's gone: clean up */
368 close(cli->fd);
369 chanfree(cli->flushchan);
370 chanfree(cli->readchan);
371 chanfree(cli->timerchan);
372 free(cli);
373 }
374
375 void
sunClientFlushRpc(SunClient * cli,ulong tag)376 sunClientFlushRpc(SunClient *cli, ulong tag)
377 {
378 sendul(cli->flushchan, tag);
379 }
380
381 void
sunClientProg(SunClient * cli,SunProg * p)382 sunClientProg(SunClient *cli, SunProg *p)
383 {
384 if(cli->nprog%16 == 0)
385 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
386 cli->prog[cli->nprog++] = p;
387 }
388
389 int
sunClientRpc(SunClient * cli,ulong tag,SunCall * tx,SunCall * rx,uchar ** tofree)390 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
391 {
392 uchar *bp, *p, *ep;
393 int i, n1, n2, n, nn;
394 Out o;
395 SunProg *prog;
396 SunStatus ok;
397
398 for(i=0; i<cli->nprog; i++)
399 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
400 break;
401 if(i==cli->nprog){
402 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
403 return -1;
404 }
405 prog = cli->prog[i];
406
407 if(cli->chatty){
408 fprint(2, "out: %B\n", &tx->rpc);
409 fprint(2, "\t%C\n", tx);
410 }
411
412 n1 = sunRpcSize(&tx->rpc);
413 n2 = sunCallSize(prog, tx);
414
415 n = n1+n2;
416 if(cli->needcount)
417 n += 4;
418
419 bp = emalloc(n);
420 ep = bp+n;
421 p = bp;
422 if(cli->needcount){
423 nn = n-4;
424 p[0] = (nn>>24)|0x80;
425 p[1] = nn>>16;
426 p[2] = nn>>8;
427 p[3] = nn;
428 p += 4;
429 }
430 if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess
431 || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){
432 sunErrstr(ok);
433 free(bp);
434 return -1;
435 }
436 if(p != ep){
437 werrstr("rpc: packet size mismatch");
438 free(bp);
439 return -1;
440 }
441
442 memset(&o, 0, sizeof o);
443 o.creply = chancreate(sizeof(void*), 0);
444 o.tag = tag;
445 o.p = bp;
446 o.n = n;
447
448 sendp(cli->rpcchan, &o);
449 recvp(o.creply);
450 chanfree(o.creply);
451
452 if(o.p == nil){
453 werrstr("%s", o.err);
454 return -1;
455 }
456
457 p = o.rpc.data;
458 ep = p+o.rpc.ndata;
459 rx->rpc = o.rpc;
460 rx->rpc.proc = tx->rpc.proc;
461 rx->rpc.prog = tx->rpc.prog;
462 rx->rpc.vers = tx->rpc.vers;
463 rx->type = (rx->rpc.proc<<1)|1;
464 if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){
465 sunErrstr(ok);
466 werrstr("unpack: %r");
467 free(o.p);
468 return -1;
469 }
470
471 if(cli->chatty){
472 fprint(2, "in: %B\n", &rx->rpc);
473 fprint(2, "in:\t%C\n", rx);
474 }
475
476 if(tofree)
477 *tofree = o.p;
478 else
479 free(o.p);
480
481 return 0;
482 }
483