1 /* 2 * Sun RPC client. 3 */ 4 #include <u.h> 5 #include <libc.h> 6 #include <thread.h> 7 #include <sunrpc.h> 8 9 typedef struct Out Out; 10 struct Out 11 { 12 char err[ERRMAX]; /* error string */ 13 Channel *creply; /* send to finish rpc */ 14 uchar *p; /* pending request packet */ 15 int n; /* size of request */ 16 ulong tag; /* flush tag of pending request */ 17 ulong xid; /* xid of pending request */ 18 ulong st; /* first send time */ 19 ulong t; /* resend time */ 20 int nresend; /* number of resends */ 21 SunRpc rpc; /* response rpc */ 22 }; 23 24 static void 25 udpThread(void *v) 26 { 27 uchar *p, *buf; 28 Ioproc *io; 29 int n; 30 SunClient *cli; 31 enum { BufSize = 65536 }; 32 33 cli = v; 34 buf = emalloc(BufSize); 35 io = ioproc(); 36 p = nil; 37 for(;;){ 38 n = ioread(io, cli->fd, buf, BufSize); 39 if(n <= 0) 40 break; 41 p = emalloc(4+n); 42 memmove(p+4, buf, n); 43 p[0] = n>>24; 44 p[1] = n>>16; 45 p[2] = n>>8; 46 p[3] = n; 47 if(sendp(cli->readchan, p) == 0) 48 break; 49 p = nil; 50 } 51 free(p); 52 closeioproc(io); 53 while(send(cli->dying, nil) == -1) 54 ; 55 } 56 57 static void 58 netThread(void *v) 59 { 60 uchar *p, buf[4]; 61 Ioproc *io; 62 uint n, tot; 63 int done; 64 SunClient *cli; 65 66 cli = v; 67 io = ioproc(); 68 tot = 0; 69 p = nil; 70 for(;;){ 71 n = ioreadn(io, cli->fd, buf, 4); 72 if(n != 4) 73 break; 74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3]; 75 if(cli->chatty) 76 fprint(2, "%.8ux...", n); 77 done = n&0x80000000; 78 n &= ~0x80000000; 79 if(tot == 0){ 80 p = emalloc(4+n); 81 tot = 4; 82 }else 83 p = erealloc(p, tot+n); 84 if(ioreadn(io, cli->fd, p+tot, n) != n) 85 break; 86 tot += n; 87 if(done){ 88 p[0] = tot>>24; 89 p[1] = tot>>16; 90 p[2] = tot>>8; 91 p[3] = tot; 92 if(sendp(cli->readchan, p) == 0) 93 break; 94 p = nil; 95 tot = 0; 96 } 97 } 98 free(p); 99 closeioproc(io); 100 while(send(cli->dying, 0) == -1) 101 ; 102 } 103 104 static void 105 timerThread(void *v) 106 { 107 Ioproc *io; 108 SunClient *cli; 109 110 cli = v; 111 io = ioproc(); 112 for(;;){ 113 if(iosleep(io, 200) < 0) 114 break; 115 if(sendul(cli->timerchan, 0) == 0) 116 break; 117 } 118 closeioproc(io); 119 while(send(cli->dying, 0) == -1) 120 ; 121 } 122 123 static ulong 124 msec(void) 125 { 126 return nsec()/1000000; 127 } 128 129 static ulong 130 twait(ulong rtt, int nresend) 131 { 132 ulong t; 133 134 t = rtt; 135 if(nresend <= 1) 136 {} 137 else if(nresend <= 3) 138 t *= 2; 139 else if(nresend <= 18) 140 t <<= nresend-2; 141 else 142 t = 60*1000; 143 if(t > 60*1000) 144 t = 60*1000; 145 146 return t; 147 } 148 149 static void 150 rpcMuxThread(void *v) 151 { 152 uchar *buf, *p, *ep; 153 int i, n, nout, mout; 154 ulong t, xidgen, tag; 155 Alt a[5]; 156 Out *o, **out; 157 SunRpc rpc; 158 SunClient *cli; 159 160 cli = v; 161 mout = 16; 162 nout = 0; 163 out = emalloc(mout*sizeof(out[0])); 164 xidgen = truerand(); 165 166 a[0].op = CHANRCV; 167 a[0].c = cli->rpcchan; 168 a[0].v = &o; 169 a[1].op = CHANNOP; 170 a[1].c = cli->timerchan; 171 a[1].v = nil; 172 a[2].op = CHANRCV; 173 a[2].c = cli->flushchan; 174 a[2].v = &tag; 175 a[3].op = CHANRCV; 176 a[3].c = cli->readchan; 177 a[3].v = &buf; 178 a[4].op = CHANEND; 179 180 for(;;){ 181 switch(alt(a)){ 182 case 0: /* rpcchan */ 183 if(o == nil) 184 goto Done; 185 cli->nsend++; 186 /* set xid */ 187 o->xid = ++xidgen; 188 if(cli->needcount) 189 p = o->p+4; 190 else 191 p = o->p; 192 p[0] = xidgen>>24; 193 p[1] = xidgen>>16; 194 p[2] = xidgen>>8; 195 p[3] = xidgen; 196 if(write(cli->fd, o->p, o->n) != o->n){ 197 free(o->p); 198 o->p = nil; 199 snprint(o->err, sizeof o->err, "write: %r"); 200 sendp(o->creply, 0); 201 break; 202 } 203 if(nout >= mout){ 204 mout *= 2; 205 out = erealloc(out, mout*sizeof(out[0])); 206 } 207 o->st = msec(); 208 o->nresend = 0; 209 o->t = o->st + twait(cli->rtt.avg, 0); 210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t); 211 out[nout++] = o; 212 a[1].op = CHANRCV; 213 break; 214 215 case 1: /* timerchan */ 216 t = msec(); 217 for(i=0; i<nout; i++){ 218 o = out[i]; 219 if((int)(t - o->t) > 0){ 220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t); 221 if(cli->maxwait && t - o->st >= cli->maxwait){ 222 free(o->p); 223 o->p = nil; 224 strcpy(o->err, "timeout"); 225 sendp(o->creply, 0); 226 out[i--] = out[--nout]; 227 continue; 228 } 229 cli->nresend++; 230 o->nresend++; 231 o->t = t + twait(cli->rtt.avg, o->nresend); 232 if(write(cli->fd, o->p, o->n) != o->n){ 233 free(o->p); 234 o->p = nil; 235 snprint(o->err, sizeof o->err, "rewrite: %r"); 236 sendp(o->creply, 0); 237 out[i--] = out[--nout]; 238 continue; 239 } 240 } 241 } 242 /* stop ticking if no work; rpcchan will turn it back on */ 243 if(nout == 0) 244 a[1].op = CHANNOP; 245 break; 246 247 case 2: /* flushchan */ 248 for(i=0; i<nout; i++){ 249 o = out[i]; 250 if(o->tag == tag){ 251 out[i--] = out[--nout]; 252 strcpy(o->err, "flushed"); 253 free(o->p); 254 o->p = nil; 255 sendp(o->creply, 0); 256 } 257 } 258 break; 259 260 case 3: /* readchan */ 261 p = buf; 262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3]; 263 p += 4; 264 ep = p+n; 265 if(sunRpcUnpack(p, ep, &p, &rpc) < 0){ 266 fprint(2, "in: %.*H unpack failed\n", n, buf+4); 267 free(buf); 268 break; 269 } 270 if(cli->chatty) 271 fprint(2, "in: %B\n", &rpc); 272 if(rpc.iscall){ 273 fprint(2, "did not get reply\n"); 274 free(buf); 275 break; 276 } 277 for(i=0; i<nout; i++) 278 if(o->xid == rpc.xid) 279 break; 280 if(i==nout){ 281 if(cli->chatty) fprint(2, "did not find waiting request\n"); 282 free(buf); 283 break; 284 } 285 out[i] = out[--nout]; 286 free(o->p); 287 o->p = nil; 288 if(rpc.status == SunSuccess){ 289 o->p = buf; 290 o->rpc = rpc; 291 }else{ 292 o->p = nil; 293 free(buf); 294 sunErrstr(rpc.status); 295 rerrstr(o->err, sizeof o->err); 296 } 297 sendp(o->creply, 0); 298 break; 299 } 300 } 301 Done: 302 free(out); 303 sendp(cli->dying, 0); 304 } 305 306 SunClient* 307 sunDial(char *address) 308 { 309 int fd; 310 SunClient *cli; 311 312 if((fd = dial(address, 0, 0, 0)) < 0) 313 return nil; 314 315 cli = emalloc(sizeof(SunClient)); 316 cli->fd = fd; 317 cli->maxwait = 15000; 318 cli->rtt.avg = 1000; 319 cli->dying = chancreate(sizeof(void*), 0); 320 cli->rpcchan = chancreate(sizeof(Out*), 0); 321 cli->timerchan = chancreate(sizeof(ulong), 0); 322 cli->flushchan = chancreate(sizeof(ulong), 0); 323 cli->readchan = chancreate(sizeof(uchar*), 0); 324 if(strstr(address, "udp!")){ 325 cli->needcount = 0; 326 cli->nettid = threadcreate(udpThread, cli, SunStackSize); 327 cli->timertid = threadcreate(timerThread, cli, SunStackSize); 328 }else{ 329 cli->needcount = 1; 330 cli->nettid = threadcreate(netThread, cli, SunStackSize); 331 /* assume reliable: don't need timer */ 332 /* BUG: netThread should know how to redial */ 333 } 334 threadcreate(rpcMuxThread, cli, SunStackSize); 335 336 return cli; 337 } 338 339 void 340 sunClientClose(SunClient *cli) 341 { 342 int n; 343 344 /* 345 * Threadints get you out of any stuck system calls 346 * or thread rendezvouses, but do nothing if the thread 347 * is in the ready state. Keep interrupting until it takes. 348 */ 349 n = 0; 350 if(!cli->timertid) 351 n++; 352 while(n < 2){ 353 threadint(cli->nettid); 354 if(cli->timertid) 355 threadint(cli->timertid); 356 yield(); 357 while(nbrecv(cli->dying, nil) == 1) 358 n++; 359 } 360 361 sendp(cli->rpcchan, 0); 362 recvp(cli->dying); 363 364 /* everyone's gone: clean up */ 365 close(cli->fd); 366 chanfree(cli->flushchan); 367 chanfree(cli->readchan); 368 chanfree(cli->timerchan); 369 free(cli); 370 } 371 372 void 373 sunClientFlushRpc(SunClient *cli, ulong tag) 374 { 375 sendul(cli->flushchan, tag); 376 } 377 378 void 379 sunClientProg(SunClient *cli, SunProg *p) 380 { 381 if(cli->nprog%16 == 0) 382 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0])); 383 cli->prog[cli->nprog++] = p; 384 } 385 386 int 387 sunClientRpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree) 388 { 389 uchar *bp, *p, *ep; 390 int i, n1, n2, n, nn; 391 Out o; 392 SunProg *prog; 393 SunStatus ok; 394 395 for(i=0; i<cli->nprog; i++) 396 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers) 397 break; 398 if(i==cli->nprog){ 399 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers); 400 return -1; 401 } 402 prog = cli->prog[i]; 403 404 if(cli->chatty){ 405 fprint(2, "out: %B\n", &tx->rpc); 406 fprint(2, "\t%C\n", tx); 407 } 408 409 n1 = sunRpcSize(&tx->rpc); 410 n2 = sunCallSize(prog, tx); 411 412 n = n1+n2; 413 if(cli->needcount) 414 n += 4; 415 416 bp = emalloc(n); 417 ep = bp+n; 418 p = bp; 419 if(cli->needcount){ 420 nn = n-4; 421 p[0] = (nn>>24)|0x80; 422 p[1] = nn>>16; 423 p[2] = nn>>8; 424 p[3] = nn; 425 p += 4; 426 } 427 if((ok = sunRpcPack(p, ep, &p, &tx->rpc)) != SunSuccess 428 || (ok = sunCallPack(prog, p, ep, &p, tx)) != SunSuccess){ 429 sunErrstr(ok); 430 free(bp); 431 return -1; 432 } 433 if(p != ep){ 434 werrstr("rpc: packet size mismatch"); 435 free(bp); 436 return -1; 437 } 438 439 memset(&o, 0, sizeof o); 440 o.creply = chancreate(sizeof(void*), 0); 441 o.tag = tag; 442 o.p = bp; 443 o.n = n; 444 445 sendp(cli->rpcchan, &o); 446 recvp(o.creply); 447 chanfree(o.creply); 448 449 if(o.p == nil){ 450 werrstr("%s", o.err); 451 return -1; 452 } 453 454 p = o.rpc.data; 455 ep = p+o.rpc.ndata; 456 rx->rpc = o.rpc; 457 rx->rpc.proc = tx->rpc.proc; 458 rx->rpc.prog = tx->rpc.prog; 459 rx->rpc.vers = tx->rpc.vers; 460 rx->type = (rx->rpc.proc<<1)|1; 461 if((ok = sunCallUnpack(prog, p, ep, &p, rx)) != SunSuccess){ 462 sunErrstr(ok); 463 werrstr("unpack: %r"); 464 free(o.p); 465 return -1; 466 } 467 468 if(cli->chatty){ 469 fprint(2, "in: %B\n", &rx->rpc); 470 fprint(2, "in:\t%C\n", rx); 471 } 472 473 if(tofree) 474 *tofree = o.p; 475 else 476 free(o.p); 477 478 return 0; 479 } 480