1 /* 2 * Sun RPC client. 3 */ 4 #include <u.h> 5 #include <libc.h> 6 #include <thread.h> 7 #include <sunrpc.h> 8 9 typedef struct Out Out; 10 struct Out 11 { 12 char err[ERRMAX]; /* error string */ 13 Channel *creply; /* send to finish rpc */ 14 uchar *p; /* pending request packet */ 15 int n; /* size of request */ 16 ulong tag; /* flush tag of pending request */ 17 ulong xid; /* xid of pending request */ 18 ulong st; /* first send time */ 19 ulong t; /* resend time */ 20 int nresend; /* number of resends */ 21 SunRpc rpc; /* response rpc */ 22 }; 23 24 static void 25 udpThread(void *v) 26 { 27 uchar *p, *buf; 28 Ioproc *io; 29 int n; 30 SunClient *cli; 31 enum { BufSize = 65536 }; 32 33 cli = v; 34 buf = emalloc(BufSize); 35 io = ioproc(); 36 p = nil; 37 for(;;){ 38 n = ioread(io, cli->fd, buf, BufSize); 39 if(n <= 0) 40 break; 41 p = emalloc(4+n); 42 memmove(p+4, buf, n); 43 p[0] = n>>24; 44 p[1] = n>>16; 45 p[2] = n>>8; 46 p[3] = n; 47 if(sendp(cli->readchan, p) == 0) 48 break; 49 p = nil; 50 } 51 free(p); 52 closeioproc(io); 53 while(send(cli->dying, nil) == -1) 54 ; 55 } 56 57 static void 58 netThread(void *v) 59 { 60 uchar *p, buf[4]; 61 Ioproc *io; 62 uint n, tot; 63 int done; 64 SunClient *cli; 65 66 cli = v; 67 io = ioproc(); 68 tot = 0; 69 p = nil; 70 for(;;){ 71 n = ioreadn(io, cli->fd, buf, 4); 72 if(n != 4) 73 break; 74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3]; 75 if(cli->chatty) 76 fprint(2, "%.8ux...", n); 77 done = n&0x80000000; 78 n &= ~0x80000000; 79 if(tot == 0){ 80 p = emalloc(4+n); 81 tot = 4; 82 }else 83 p = erealloc(p, tot+n); 84 if(ioreadn(io, cli->fd, p+tot, n) != n) 85 break; 86 tot += n; 87 if(done){ 88 p[0] = tot>>24; 89 p[1] = tot>>16; 90 p[2] = tot>>8; 91 p[3] = tot; 92 if(sendp(cli->readchan, p) == 0) 93 break; 94 p = nil; 95 tot = 0; 96 } 97 } 98 free(p); 99 closeioproc(io); 100 while(send(cli->dying, 0) == -1) 101 ; 102 } 103 104 static void 105 timerThread(void *v) 106 { 107 Ioproc *io; 108 SunClient *cli; 109 110 cli = v; 111 io = ioproc(); 112 for(;;){ 113 if(iosleep(io, 200) < 0) 114 break; 115 if(sendul(cli->timerchan, 0) == 0) 116 break; 117 } 118 closeioproc(io); 119 while(send(cli->dying, 0) == -1) 120 ; 121 } 122 123 static ulong 124 msec(void) 125 { 126 return nsec()/1000000; 127 } 128 129 static ulong 130 twait(ulong rtt, int nresend) 131 { 132 ulong t; 133 134 t = rtt; 135 if(nresend <= 1) 136 {} 137 else if(nresend <= 3) 138 t *= 2; 139 else if(nresend <= 18) 140 t <<= nresend-2; 141 else 142 t = 60*1000; 143 if(t > 60*1000) 144 t = 60*1000; 145 146 return t; 147 } 148 149 static void 150 rpcMuxThread(void *v) 151 { 152 uchar *buf, *p, *ep; 153 int i, n, nout, mout; 154 ulong t, xidgen, tag; 155 Alt a[5]; 156 Out *o, **out; 157 SunRpc rpc; 158 SunClient *cli; 159 160 cli = v; 161 mout = 16; 162 nout = 0; 163 out = emalloc(mout*sizeof(out[0])); 164 xidgen = truerand(); 165 166 a[0].op = CHANRCV; 167 a[0].c = cli->rpcchan; 168 a[0].v = &o; 169 a[1].op = CHANNOP; 170 a[1].c = cli->timerchan; 171 a[1].v = nil; 172 a[2].op = CHANRCV; 173 a[2].c = cli->flushchan; 174 a[2].v = &tag; 175 a[3].op = CHANRCV; 176 a[3].c = cli->readchan; 177 a[3].v = &buf; 178 a[4].op = CHANEND; 179 180 for(;;){ 181 switch(alt(a)){ 182 case 0: /* o = <-rpcchan */ 183 if(o == nil) 184 goto Done; 185 cli->nsend++; 186 /* set xid */ 187 o->xid = ++xidgen; 188 if(cli->needcount) 189 p = o->p+4; 190 else 191 p = o->p; 192 p[0] = xidgen>>24; 193 p[1] = xidgen>>16; 194 p[2] = xidgen>>8; 195 p[3] = xidgen; 196 if(write(cli->fd, o->p, o->n) != o->n){ 197 free(o->p); 198 o->p = nil; 199 snprint(o->err, sizeof o->err, "write: %r"); 200 sendp(o->creply, 0); 201 break; 202 } 203 if(nout >= mout){ 204 mout *= 2; 205 out = erealloc(out, mout*sizeof(out[0])); 206 } 207 o->st = msec(); 208 o->nresend = 0; 209 o->t = o->st + twait(cli->rtt.avg, 0); 210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t); 211 out[nout++] = o; 212 a[1].op = CHANRCV; 213 break; 214 215 case 1: /* <-timerchan */ 216 t = msec(); 217 for(i=0; i<nout; i++){ 218 o = out[i]; 219 if((int)(t - o->t) > 0){ 220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t); 221 if(cli->maxwait && t - o->st >= cli->maxwait){ 222 free(o->p); 223 o->p = nil; 224 strcpy(o->err, "timeout"); 225 sendp(o->creply, 0); 226 out[i--] = out[--nout]; 227 continue; 228 } 229 cli->nresend++; 230 o->nresend++; 231 o->t = t + twait(cli->rtt.avg, o->nresend); 232 if(write(cli->fd, o->p, o->n) != o->n){ 233 free(o->p); 234 o->p = nil; 235 snprint(o->err, sizeof o->err, "rewrite: %r"); 236 sendp(o->creply, 0); 237 out[i--] = out[--nout]; 238 continue; 239 } 240 } 241 } 242 /* stop ticking if no work; rpcchan will turn it back on */ 243 if(nout == 0) 244 a[1].op = CHANNOP; 245 break; 246 247 case 2: /* tag = <-flushchan */ 248 for(i=0; i<nout; i++){ 249 o = out[i]; 250 if(o->tag == tag){ 251 out[i--] = out[--nout]; 252 strcpy(o->err, "flushed"); 253 free(o->p); 254 o->p = nil; 255 sendp(o->creply, 0); 256 } 257 } 258 break; 259 260 case 3: /* buf = <-readchan */ 261 p = buf; 262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3]; 263 p += 4; 264 ep = p+n; 265 if(sunRpcUnpack(p, ep, &p, &rpc) < 0){ 266 fprint(2, "in: %.*H unpack failed\n", n, buf+4); 267 free(buf); 268 break; 269 } 270 if(cli->chatty) 271 fprint(2, "in: %B\n", &rpc); 272 if(rpc.iscall){ 273 fprint(2, "did not get reply\n"); 274 free(buf); 275 break; 276 } 277 o = nil; 278 for(i=0; i<nout; i++){ 279 o = out[i]; 280 if(o->xid == rpc.xid) 281 break; 282 } 283 if(i==nout){ 284 if(cli->chatty) fprint(2, "did not find waiting request\n"); 285 free(buf); 286 break; 287 } 288 out[i] = out[--nout]; 289 free(o->p); 290 o->p = nil; 291 if(rpc.status == SunSuccess){ 292 o->p = buf; 293 o->rpc = rpc; 294 }else{ 295 o->p = nil; 296 free(buf); 297 sunErrstr(rpc.status); 298 rerrstr(o->err, sizeof o->err); 299 } 300 sendp(o->creply, 0); 301 break; 302 } 303 } 304 Done: 305 free(out); 306 sendp(cli->dying, 0); 307 } 308 309 SunClient* 310 sunDial(char *address) 311 { 312 int fd; 313 SunClient *cli; 314 315 if((fd = dial(address, 0, 0, 0)) < 0) 316 return nil; 317 318 cli = emalloc(sizeof(SunClient)); 319 cli->fd = fd; 320 cli->maxwait = 15000; 321 cli->rtt.avg = 1000; 322 cli->dying = chancreate(sizeof(void*), 0); 323 cli->rpcchan = chancreate(sizeof(Out*), 0); 324 cli->timerchan = chancreate(sizeof(ulong), 0); 325 cli->flushchan = chancreate(sizeof(ulong), 0); 326 cli->readchan = chancreate(sizeof(uchar*), 0); 327 if(strstr(address, "udp!")){ 328 cli->needcount = 0; 329 cli->nettid = threadcreate(udpThread, cli, SunStackSize); 330 cli->timertid = threadcreate(timerThread, cli, SunStackSize); 331 }else{ 332 cli->needcount = 1; 333 cli->nettid = threadcreate(netThread, cli, SunStackSize); 334 /* assume reliable: don't need timer */ 335 /* BUG: netThread should know how to redial */ 336 } 337 threadcreate(rpcMuxThread, cli, SunStackSize); 338 339 return cli; 340 } 341 342 void 343 sunClientClose(SunClient *cli) 344 { 345 int n; 346 347 /* 348 * Threadints get you out of any stuck system calls 349 * or thread rendezvouses, but do nothing if the thread 350 * is in the ready state. Keep interrupting until it takes. 351 */ 352 n = 0; 353 if(!cli->timertid) 354 n++; 355 while(n < 2){ 356 threadint(cli->nettid); 357 if(cli->timertid) 358 threadint(cli->timertid); 359 yield(); 360 while(nbrecv(cli->dying, nil) == 1) 361 n++; 362 } 363 364 sendp(cli->rpcchan, 0); 365 recvp(cli->dying); 366 367 /* everyone's gone: clean up */ 368 close(cli->fd); 369 chanfree(cli->flushchan); 370 chanfree(cli->readchan); 371 chanfree(cli->timerchan); 372 free(cli); 373 } 374 375 void 376 sunClientFlushRpc(SunClient *cli, ulong tag) 377 { 378 sendul(cli->flushchan, tag); 379 } 380 381 void 382 sunClientProg(SunClient *cli, SunProg *p) 383 { 384 if(cli->nprog%16 == 0) 385 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0])); 386 cli->prog[cli->nprog++] = p; 387 } 388 389 int 390 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree) 391 { 392 uchar *bp, *p, *ep; 393 int i, n1, n2, n, nn; 394 Out o; 395 SunProg *prog; 396 SunStatus ok; 397 398 for(i=0; i<cli->nprog; i++) 399 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers) 400 break; 401 if(i==cli->nprog){ 402 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers); 403 return -1; 404 } 405 prog = cli->prog[i]; 406 407 if(cli->chatty){ 408 fprint(2, "out: %B\n", &tx->rpc); 409 fprint(2, "\t%C\n", tx); 410 } 411 412 n1 = sunRpcSize(&tx->rpc); 413 n2 = sunCallSize(prog, tx); 414 415 n = n1+n2; 416 if(cli->needcount) 417 n += 4; 418 419 bp = emalloc(n); 420 ep = bp+n; 421 p = bp; 422 if(cli->needcount){ 423 nn = n-4; 424 p[0] = (nn>>24)|0x80; 425 p[1] = nn>>16; 426 p[2] = nn>>8; 427 p[3] = nn; 428 p += 4; 429 } 430 if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess 431 || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){ 432 sunErrstr(ok); 433 free(bp); 434 return -1; 435 } 436 if(p != ep){ 437 werrstr("rpc: packet size mismatch"); 438 free(bp); 439 return -1; 440 } 441 442 memset(&o, 0, sizeof o); 443 o.creply = chancreate(sizeof(void*), 0); 444 o.tag = tag; 445 o.p = bp; 446 o.n = n; 447 448 sendp(cli->rpcchan, &o); 449 recvp(o.creply); 450 chanfree(o.creply); 451 452 if(o.p == nil){ 453 werrstr("%s", o.err); 454 return -1; 455 } 456 457 p = o.rpc.data; 458 ep = p+o.rpc.ndata; 459 rx->rpc = o.rpc; 460 rx->rpc.proc = tx->rpc.proc; 461 rx->rpc.prog = tx->rpc.prog; 462 rx->rpc.vers = tx->rpc.vers; 463 rx->type = (rx->rpc.proc<<1)|1; 464 if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){ 465 sunErrstr(ok); 466 werrstr("unpack: %r"); 467 free(o.p); 468 return -1; 469 } 470 471 if(cli->chatty){ 472 fprint(2, "in: %B\n", &rx->rpc); 473 fprint(2, "in:\t%C\n", rx); 474 } 475 476 if(tofree) 477 *tofree = o.p; 478 else 479 free(o.p); 480 481 return 0; 482 } 483