1 /* 2 * Reliable User Datagram Protocol, currently only for IPv4. 3 * This protocol is compatible with UDP's packet format. 4 * It could be done over UDP if need be. 5 */ 6 #include "u.h" 7 #include "../port/lib.h" 8 #include "mem.h" 9 #include "dat.h" 10 #include "fns.h" 11 #include "../port/error.h" 12 13 #include "ip.h" 14 15 #define DEBUG 0 16 #define DPRINT if(DEBUG)print 17 18 #define SEQDIFF(a,b) ( (a)>=(b)?\ 19 (a)-(b):\ 20 0xffffffffUL-((b)-(a)) ) 21 #define INSEQ(a,start,end) ( (start)<=(end)?\ 22 ((a)>(start)&&(a)<=(end)):\ 23 ((a)>(start)||(a)<=(end)) ) 24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd) 25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 ) 26 27 enum 28 { 29 UDP_PHDRSIZE = 12, /* pseudo header */ 30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */ 31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */ 32 UDP_IPHDR = 8, /* ip header */ 33 IP_UDPPROTO = 254, 34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */ 35 36 Rudprxms = 200, 37 Rudptickms = 50, 38 Rudpmaxxmit = 10, 39 Maxunacked = 100, 40 }; 41 42 #define Hangupgen 0xffffffff /* used only in hangup messages */ 43 44 typedef struct Udphdr Udphdr; 45 struct Udphdr 46 { 47 /* ip header */ 48 uchar vihl; /* Version and header length */ 49 uchar tos; /* Type of service */ 50 uchar length[2]; /* packet length */ 51 uchar id[2]; /* Identification */ 52 uchar frag[2]; /* Fragment information */ 53 54 /* pseudo header starts here */ 55 uchar Unused; 56 uchar udpproto; /* Protocol */ 57 uchar udpplen[2]; /* Header plus data length */ 58 uchar udpsrc[4]; /* Ip source */ 59 uchar udpdst[4]; /* Ip destination */ 60 61 /* udp header */ 62 uchar udpsport[2]; /* Source port */ 63 uchar udpdport[2]; /* Destination port */ 64 uchar udplen[2]; /* data length */ 65 uchar udpcksum[2]; /* Checksum */ 66 }; 67 68 typedef struct Rudphdr Rudphdr; 69 struct Rudphdr 70 { 71 /* ip header */ 72 uchar vihl; /* Version and header length */ 73 uchar tos; /* Type of service */ 74 uchar length[2]; /* packet length */ 75 uchar id[2]; /* Identification */ 76 uchar frag[2]; /* Fragment information */ 77 78 /* pseudo header starts here */ 79 uchar Unused; 80 uchar udpproto; /* Protocol */ 81 uchar udpplen[2]; /* Header plus data length */ 82 uchar udpsrc[4]; /* Ip source */ 83 uchar udpdst[4]; /* Ip destination */ 84 85 /* udp header */ 86 uchar udpsport[2]; /* Source port */ 87 uchar udpdport[2]; /* Destination port */ 88 uchar udplen[2]; /* data length (includes rudp header) */ 89 uchar udpcksum[2]; /* Checksum */ 90 91 /* rudp header */ 92 uchar relseq[4]; /* id of this packet (or 0) */ 93 uchar relsgen[4]; /* generation/time stamp */ 94 uchar relack[4]; /* packet being acked (or 0) */ 95 uchar relagen[4]; /* generation/time stamp */ 96 }; 97 98 99 /* 100 * one state structure per destination 101 */ 102 typedef struct Reliable Reliable; 103 struct Reliable 104 { 105 Ref; 106 107 Reliable *next; 108 109 uchar addr[IPaddrlen]; /* always V6 when put here */ 110 ushort port; 111 112 Block *unacked; /* unacked msg list */ 113 Block *unackedtail; /* and its tail */ 114 115 int timeout; /* time since first unacked msg sent */ 116 int xmits; /* number of times first unacked msg sent */ 117 118 ulong sndseq; /* next packet to be sent */ 119 ulong sndgen; /* and its generation */ 120 121 ulong rcvseq; /* last packet received */ 122 ulong rcvgen; /* and its generation */ 123 124 ulong acksent; /* last ack sent */ 125 ulong ackrcvd; /* last msg for which ack was rcvd */ 126 127 /* flow control */ 128 QLock lock; 129 Rendez vous; 130 int blocked; 131 }; 132 133 134 135 /* MIB II counters */ 136 typedef struct Rudpstats Rudpstats; 137 struct Rudpstats 138 { 139 ulong rudpInDatagrams; 140 ulong rudpNoPorts; 141 ulong rudpInErrors; 142 ulong rudpOutDatagrams; 143 }; 144 145 typedef struct Rudppriv Rudppriv; 146 struct Rudppriv 147 { 148 Ipht ht; 149 150 /* MIB counters */ 151 Rudpstats ustats; 152 153 /* non-MIB stats */ 154 ulong csumerr; /* checksum errors */ 155 ulong lenerr; /* short packet */ 156 ulong rxmits; /* # of retransmissions */ 157 ulong orders; /* # of out of order pkts */ 158 159 /* keeping track of the ack kproc */ 160 int ackprocstarted; 161 QLock apl; 162 }; 163 164 165 static ulong generation = 0; 166 static Rendez rend; 167 168 /* 169 * protocol specific part of Conv 170 */ 171 typedef struct Rudpcb Rudpcb; 172 struct Rudpcb 173 { 174 QLock; 175 uchar headers; 176 uchar randdrop; 177 Reliable *r; 178 }; 179 180 /* 181 * local functions 182 */ 183 void relsendack(Conv*, Reliable*, int); 184 int reliput(Conv*, Block*, uchar*, ushort); 185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*); 186 void relput(Reliable*); 187 void relforget(Conv *, uchar*, int, int); 188 void relackproc(void *); 189 void relackq(Reliable *, Block*); 190 void relhangup(Conv *, Reliable*); 191 void relrexmit(Conv *, Reliable*); 192 void relput(Reliable*); 193 void rudpkick(void *x); 194 195 static void 196 rudpstartackproc(Proto *rudp) 197 { 198 Rudppriv *rpriv; 199 char kpname[KNAMELEN]; 200 201 rpriv = rudp->priv; 202 if(rpriv->ackprocstarted == 0){ 203 qlock(&rpriv->apl); 204 if(rpriv->ackprocstarted == 0){ 205 snprint(kpname, sizeof kpname, "#I%drudpack", 206 rudp->f->dev); 207 kproc(kpname, relackproc, rudp); 208 rpriv->ackprocstarted = 1; 209 } 210 qunlock(&rpriv->apl); 211 } 212 } 213 214 static char* 215 rudpconnect(Conv *c, char **argv, int argc) 216 { 217 char *e; 218 Rudppriv *upriv; 219 220 upriv = c->p->priv; 221 rudpstartackproc(c->p); 222 e = Fsstdconnect(c, argv, argc); 223 Fsconnected(c, e); 224 iphtadd(&upriv->ht, c); 225 226 return e; 227 } 228 229 230 static int 231 rudpstate(Conv *c, char *state, int n) 232 { 233 Rudpcb *ucb; 234 Reliable *r; 235 int m; 236 237 m = snprint(state, n, "%s", c->inuse?"Open":"Closed"); 238 ucb = (Rudpcb*)c->ptcl; 239 qlock(ucb); 240 for(r = ucb->r; r; r = r->next) 241 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r)); 242 m += snprint(state+m, n-m, "\n"); 243 qunlock(ucb); 244 return m; 245 } 246 247 static char* 248 rudpannounce(Conv *c, char** argv, int argc) 249 { 250 char *e; 251 Rudppriv *upriv; 252 253 upriv = c->p->priv; 254 rudpstartackproc(c->p); 255 e = Fsstdannounce(c, argv, argc); 256 if(e != nil) 257 return e; 258 Fsconnected(c, nil); 259 iphtadd(&upriv->ht, c); 260 261 return nil; 262 } 263 264 static void 265 rudpcreate(Conv *c) 266 { 267 c->rq = qopen(64*1024, Qmsg, 0, 0); 268 c->wq = qopen(64*1024, Qkick, rudpkick, c); 269 } 270 271 static void 272 rudpclose(Conv *c) 273 { 274 Rudpcb *ucb; 275 Reliable *r, *nr; 276 Rudppriv *upriv; 277 278 upriv = c->p->priv; 279 iphtrem(&upriv->ht, c); 280 281 /* force out any delayed acks */ 282 ucb = (Rudpcb*)c->ptcl; 283 qlock(ucb); 284 for(r = ucb->r; r; r = r->next){ 285 if(r->acksent != r->rcvseq) 286 relsendack(c, r, 0); 287 } 288 qunlock(ucb); 289 290 qclose(c->rq); 291 qclose(c->wq); 292 qclose(c->eq); 293 ipmove(c->laddr, IPnoaddr); 294 ipmove(c->raddr, IPnoaddr); 295 c->lport = 0; 296 c->rport = 0; 297 298 ucb->headers = 0; 299 ucb->randdrop = 0; 300 qlock(ucb); 301 for(r = ucb->r; r; r = nr){ 302 if(r->acksent != r->rcvseq) 303 relsendack(c, r, 0); 304 nr = r->next; 305 relhangup(c, r); 306 relput(r); 307 } 308 ucb->r = 0; 309 310 qunlock(ucb); 311 } 312 313 /* 314 * randomly don't send packets 315 */ 316 static void 317 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos) 318 { 319 Rudpcb *ucb; 320 321 ucb = (Rudpcb*)c->ptcl; 322 if(ucb->randdrop && nrand(100) < ucb->randdrop) 323 freeblist(bp); 324 else 325 ipoput4(f, bp, x, ttl, tos, nil); 326 } 327 328 int 329 flow(void *v) 330 { 331 Reliable *r = v; 332 333 return UNACKED(r) <= Maxunacked; 334 } 335 336 void 337 rudpkick(void *x) 338 { 339 Conv *c = x; 340 Udphdr *uh; 341 ushort rport; 342 uchar laddr[IPaddrlen], raddr[IPaddrlen]; 343 Block *bp; 344 Rudpcb *ucb; 345 Rudphdr *rh; 346 Reliable *r; 347 int dlen, ptcllen; 348 Rudppriv *upriv; 349 Fs *f; 350 351 upriv = c->p->priv; 352 f = c->p->f; 353 354 netlog(c->p->f, Logrudp, "rudp: kick\n"); 355 bp = qget(c->wq); 356 if(bp == nil) 357 return; 358 359 ucb = (Rudpcb*)c->ptcl; 360 switch(ucb->headers) { 361 case 7: 362 /* get user specified addresses */ 363 bp = pullupblock(bp, UDP_USEAD7); 364 if(bp == nil) 365 return; 366 ipmove(raddr, bp->rp); 367 bp->rp += IPaddrlen; 368 ipmove(laddr, bp->rp); 369 bp->rp += IPaddrlen; 370 /* pick interface closest to dest */ 371 if(ipforme(f, laddr) != Runi) 372 findlocalip(f, laddr, raddr); 373 bp->rp += IPaddrlen; /* Ignore ifc address */ 374 rport = nhgets(bp->rp); 375 bp->rp += 2+2; /* Ignore local port */ 376 break; 377 default: 378 ipmove(raddr, c->raddr); 379 ipmove(laddr, c->laddr); 380 rport = c->rport; 381 break; 382 } 383 384 dlen = blocklen(bp); 385 386 /* Make space to fit rudp & ip header */ 387 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE); 388 if(bp == nil) 389 return; 390 391 uh = (Udphdr *)(bp->rp); 392 uh->vihl = IP_VER4; 393 394 rh = (Rudphdr*)uh; 395 396 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE); 397 uh->Unused = 0; 398 uh->udpproto = IP_UDPPROTO; 399 uh->frag[0] = 0; 400 uh->frag[1] = 0; 401 hnputs(uh->udpplen, ptcllen); 402 switch(ucb->headers){ 403 case 7: 404 v6tov4(uh->udpdst, raddr); 405 hnputs(uh->udpdport, rport); 406 v6tov4(uh->udpsrc, laddr); 407 break; 408 default: 409 v6tov4(uh->udpdst, c->raddr); 410 hnputs(uh->udpdport, c->rport); 411 if(ipcmp(c->laddr, IPnoaddr) == 0) 412 findlocalip(f, c->laddr, c->raddr); 413 v6tov4(uh->udpsrc, c->laddr); 414 break; 415 } 416 hnputs(uh->udpsport, c->lport); 417 hnputs(uh->udplen, ptcllen); 418 uh->udpcksum[0] = 0; 419 uh->udpcksum[1] = 0; 420 421 qlock(ucb); 422 r = relstate(ucb, raddr, rport, "kick"); 423 r->sndseq = NEXTSEQ(r->sndseq); 424 hnputl(rh->relseq, r->sndseq); 425 hnputl(rh->relsgen, r->sndgen); 426 427 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */ 428 hnputl(rh->relagen, r->rcvgen); 429 430 if(r->rcvseq != r->acksent) 431 r->acksent = r->rcvseq; 432 433 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE)); 434 435 relackq(r, bp); 436 qunlock(ucb); 437 438 upriv->ustats.rudpOutDatagrams++; 439 440 DPRINT("sent: %lud/%lud, %lud/%lud\n", 441 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen); 442 443 doipoput(c, f, bp, 0, c->ttl, c->tos); 444 445 if(waserror()) { 446 relput(r); 447 qunlock(&r->lock); 448 nexterror(); 449 } 450 451 /* flow control of sorts */ 452 qlock(&r->lock); 453 if(UNACKED(r) > Maxunacked){ 454 r->blocked = 1; 455 sleep(&r->vous, flow, r); 456 r->blocked = 0; 457 } 458 459 qunlock(&r->lock); 460 relput(r); 461 poperror(); 462 } 463 464 void 465 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp) 466 { 467 int len, olen, ottl; 468 Udphdr *uh; 469 Conv *c; 470 Rudpcb *ucb; 471 uchar raddr[IPaddrlen], laddr[IPaddrlen]; 472 ushort rport, lport; 473 Rudppriv *upriv; 474 Fs *f; 475 uchar *p; 476 477 upriv = rudp->priv; 478 f = rudp->f; 479 480 upriv->ustats.rudpInDatagrams++; 481 482 uh = (Udphdr*)(bp->rp); 483 484 /* Put back pseudo header for checksum 485 * (remember old values for icmpnoconv()) 486 */ 487 ottl = uh->Unused; 488 uh->Unused = 0; 489 len = nhgets(uh->udplen); 490 olen = nhgets(uh->udpplen); 491 hnputs(uh->udpplen, len); 492 493 v4tov6(raddr, uh->udpsrc); 494 v4tov6(laddr, uh->udpdst); 495 lport = nhgets(uh->udpdport); 496 rport = nhgets(uh->udpsport); 497 498 if(nhgets(uh->udpcksum)) { 499 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) { 500 upriv->ustats.rudpInErrors++; 501 upriv->csumerr++; 502 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr); 503 DPRINT("rudp: checksum error %I\n", raddr); 504 freeblist(bp); 505 return; 506 } 507 } 508 509 qlock(rudp); 510 511 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport); 512 if(c == nil){ 513 /* no conversation found */ 514 upriv->ustats.rudpNoPorts++; 515 qunlock(rudp); 516 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport, 517 laddr, lport); 518 uh->Unused = ottl; 519 hnputs(uh->udpplen, olen); 520 icmpnoconv(f, bp); 521 freeblist(bp); 522 return; 523 } 524 ucb = (Rudpcb*)c->ptcl; 525 qlock(ucb); 526 qunlock(rudp); 527 528 if(reliput(c, bp, raddr, rport) < 0){ 529 qunlock(ucb); 530 freeb(bp); 531 return; 532 } 533 534 /* 535 * Trim the packet down to data size 536 */ 537 538 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE); 539 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len); 540 if(bp == nil) { 541 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n", 542 raddr, rport, laddr, lport); 543 DPRINT("rudp: len err %I.%d -> %I.%d\n", 544 raddr, rport, laddr, lport); 545 upriv->lenerr++; 546 return; 547 } 548 549 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n", 550 raddr, rport, laddr, lport, len); 551 552 switch(ucb->headers){ 553 case 7: 554 /* pass the src address */ 555 bp = padblock(bp, UDP_USEAD7); 556 p = bp->rp; 557 ipmove(p, raddr); p += IPaddrlen; 558 ipmove(p, laddr); p += IPaddrlen; 559 ipmove(p, ifc->lifc->local); p += IPaddrlen; 560 hnputs(p, rport); p += 2; 561 hnputs(p, lport); 562 break; 563 default: 564 /* connection oriented rudp */ 565 if(ipcmp(c->raddr, IPnoaddr) == 0){ 566 /* save the src address in the conversation */ 567 ipmove(c->raddr, raddr); 568 c->rport = rport; 569 570 /* reply with the same ip address (if not broadcast) */ 571 if(ipforme(f, laddr) == Runi) 572 ipmove(c->laddr, laddr); 573 else 574 v4tov6(c->laddr, ifc->lifc->local); 575 } 576 break; 577 } 578 if(bp->next) 579 bp = concatblock(bp); 580 581 if(qfull(c->rq)) { 582 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport, 583 laddr, lport); 584 freeblist(bp); 585 } 586 else 587 qpass(c->rq, bp); 588 589 qunlock(ucb); 590 } 591 592 static char *rudpunknown = "unknown rudp ctl request"; 593 594 char* 595 rudpctl(Conv *c, char **f, int n) 596 { 597 Rudpcb *ucb; 598 uchar ip[IPaddrlen]; 599 int x; 600 601 ucb = (Rudpcb*)c->ptcl; 602 if(n < 1) 603 return rudpunknown; 604 605 if(strcmp(f[0], "headers") == 0){ 606 ucb->headers = 7; /* new headers format */ 607 return nil; 608 } else if(strcmp(f[0], "hangup") == 0){ 609 if(n < 3) 610 return "bad syntax"; 611 if (parseip(ip, f[1]) == -1) 612 return Ebadip; 613 x = atoi(f[2]); 614 qlock(ucb); 615 relforget(c, ip, x, 1); 616 qunlock(ucb); 617 return nil; 618 } else if(strcmp(f[0], "randdrop") == 0){ 619 x = 10; /* default is 10% */ 620 if(n > 1) 621 x = atoi(f[1]); 622 if(x > 100 || x < 0) 623 return "illegal rudp drop rate"; 624 ucb->randdrop = x; 625 return nil; 626 } 627 return rudpunknown; 628 } 629 630 void 631 rudpadvise(Proto *rudp, Block *bp, char *msg) 632 { 633 Udphdr *h; 634 uchar source[IPaddrlen], dest[IPaddrlen]; 635 ushort psource, pdest; 636 Conv *s, **p; 637 638 h = (Udphdr*)(bp->rp); 639 640 v4tov6(dest, h->udpdst); 641 v4tov6(source, h->udpsrc); 642 psource = nhgets(h->udpsport); 643 pdest = nhgets(h->udpdport); 644 645 /* Look for a connection */ 646 for(p = rudp->conv; *p; p++) { 647 s = *p; 648 if(s->rport == pdest) 649 if(s->lport == psource) 650 if(ipcmp(s->raddr, dest) == 0) 651 if(ipcmp(s->laddr, source) == 0){ 652 qhangup(s->rq, msg); 653 qhangup(s->wq, msg); 654 break; 655 } 656 } 657 freeblist(bp); 658 } 659 660 int 661 rudpstats(Proto *rudp, char *buf, int len) 662 { 663 Rudppriv *upriv; 664 665 upriv = rudp->priv; 666 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n", 667 upriv->ustats.rudpInDatagrams, 668 upriv->ustats.rudpNoPorts, 669 upriv->ustats.rudpInErrors, 670 upriv->ustats.rudpOutDatagrams, 671 upriv->rxmits, 672 upriv->orders); 673 } 674 675 void 676 rudpinit(Fs *fs) 677 { 678 679 Proto *rudp; 680 681 rudp = smalloc(sizeof(Proto)); 682 rudp->priv = smalloc(sizeof(Rudppriv)); 683 rudp->name = "rudp"; 684 rudp->connect = rudpconnect; 685 rudp->announce = rudpannounce; 686 rudp->ctl = rudpctl; 687 rudp->state = rudpstate; 688 rudp->create = rudpcreate; 689 rudp->close = rudpclose; 690 rudp->rcv = rudpiput; 691 rudp->advise = rudpadvise; 692 rudp->stats = rudpstats; 693 rudp->ipproto = IP_UDPPROTO; 694 rudp->nc = 32; 695 rudp->ptclsize = sizeof(Rudpcb); 696 697 Fsproto(fs, rudp); 698 } 699 700 /*********************************************/ 701 /* Here starts the reliable helper functions */ 702 /*********************************************/ 703 /* 704 * Enqueue a copy of an unacked block for possible retransmissions 705 */ 706 void 707 relackq(Reliable *r, Block *bp) 708 { 709 Block *np; 710 711 np = copyblock(bp, blocklen(bp)); 712 if(r->unacked) 713 r->unackedtail->list = np; 714 else { 715 /* restart timer */ 716 r->timeout = 0; 717 r->xmits = 1; 718 r->unacked = np; 719 } 720 r->unackedtail = np; 721 np->list = nil; 722 } 723 724 /* 725 * retransmit unacked blocks 726 */ 727 void 728 relackproc(void *a) 729 { 730 Rudpcb *ucb; 731 Proto *rudp; 732 Reliable *r; 733 Conv **s, *c; 734 735 rudp = (Proto *)a; 736 737 loop: 738 tsleep(&up->sleep, return0, 0, Rudptickms); 739 740 for(s = rudp->conv; *s; s++) { 741 c = *s; 742 ucb = (Rudpcb*)c->ptcl; 743 qlock(ucb); 744 745 for(r = ucb->r; r; r = r->next) { 746 if(r->unacked != nil){ 747 r->timeout += Rudptickms; 748 if(r->timeout > Rudprxms*r->xmits) 749 relrexmit(c, r); 750 } 751 if(r->acksent != r->rcvseq) 752 relsendack(c, r, 0); 753 } 754 qunlock(ucb); 755 } 756 goto loop; 757 } 758 759 /* 760 * get the state record for a conversation 761 */ 762 Reliable* 763 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from) 764 { 765 Reliable *r, **l; 766 767 l = &ucb->r; 768 for(r = *l; r; r = *l){ 769 if(memcmp(addr, r->addr, IPaddrlen) == 0 && 770 port == r->port) 771 break; 772 l = &r->next; 773 } 774 775 /* no state for this addr/port, create some */ 776 if(r == nil){ 777 while(generation == 0) 778 generation = rand(); 779 780 DPRINT("from %s new state %lud for %I!%ud\n", 781 from, generation, addr, port); 782 783 r = smalloc(sizeof(Reliable)); 784 memmove(r->addr, addr, IPaddrlen); 785 r->port = port; 786 r->unacked = 0; 787 if(generation == Hangupgen) 788 generation++; 789 r->sndgen = generation++; 790 r->sndseq = 0; 791 r->ackrcvd = 0; 792 r->rcvgen = 0; 793 r->rcvseq = 0; 794 r->acksent = 0; 795 r->xmits = 0; 796 r->timeout = 0; 797 r->ref = 0; 798 incref(r); /* one reference for being in the list */ 799 800 *l = r; 801 } 802 803 incref(r); 804 return r; 805 } 806 807 void 808 relput(Reliable *r) 809 { 810 if(decref(r) == 0) 811 free(r); 812 } 813 814 /* 815 * forget a Reliable state 816 */ 817 void 818 relforget(Conv *c, uchar *ip, int port, int originator) 819 { 820 Rudpcb *ucb; 821 Reliable *r, **l; 822 823 ucb = (Rudpcb*)c->ptcl; 824 825 l = &ucb->r; 826 for(r = *l; r; r = *l){ 827 if(ipcmp(ip, r->addr) == 0 && port == r->port){ 828 *l = r->next; 829 if(originator) 830 relsendack(c, r, 1); 831 relhangup(c, r); 832 relput(r); /* remove from the list */ 833 break; 834 } 835 l = &r->next; 836 } 837 } 838 839 /* 840 * process a rcvd reliable packet. return -1 if not to be passed to user process, 841 * 0 therwise. 842 * 843 * called with ucb locked. 844 */ 845 int 846 reliput(Conv *c, Block *bp, uchar *addr, ushort port) 847 { 848 Block *nbp; 849 Rudpcb *ucb; 850 Rudppriv *upriv; 851 Udphdr *uh; 852 Reliable *r; 853 Rudphdr *rh; 854 ulong seq, ack, sgen, agen, ackreal; 855 int rv = -1; 856 857 /* get fields */ 858 uh = (Udphdr*)(bp->rp); 859 rh = (Rudphdr*)uh; 860 seq = nhgetl(rh->relseq); 861 sgen = nhgetl(rh->relsgen); 862 ack = nhgetl(rh->relack); 863 agen = nhgetl(rh->relagen); 864 865 upriv = c->p->priv; 866 ucb = (Rudpcb*)c->ptcl; 867 r = relstate(ucb, addr, port, "input"); 868 869 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n", 870 seq, sgen, ack, agen, r->sndgen); 871 872 /* if acking an incorrect generation, ignore */ 873 if(ack && agen != r->sndgen) 874 goto out; 875 876 /* Look for a hangup */ 877 if(sgen == Hangupgen) { 878 if(agen == r->sndgen) 879 relforget(c, addr, port, 0); 880 goto out; 881 } 882 883 /* make sure we're not talking to a new remote side */ 884 if(r->rcvgen != sgen){ 885 if(seq != 0 && seq != 1) 886 goto out; 887 888 /* new connection */ 889 if(r->rcvgen != 0){ 890 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen); 891 relhangup(c, r); 892 } 893 r->rcvgen = sgen; 894 } 895 896 /* dequeue acked packets */ 897 if(ack && agen == r->sndgen){ 898 ackreal = 0; 899 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){ 900 nbp = r->unacked; 901 r->unacked = nbp->list; 902 DPRINT("%lud/%lud acked, r->sndgen = %lud\n", 903 ack, agen, r->sndgen); 904 freeb(nbp); 905 r->ackrcvd = NEXTSEQ(r->ackrcvd); 906 ackreal = 1; 907 } 908 909 /* flow control */ 910 if(UNACKED(r) < Maxunacked/8 && r->blocked) 911 wakeup(&r->vous); 912 913 /* 914 * retransmit next packet if the acked packet 915 * was transmitted more than once 916 */ 917 if(ackreal && r->unacked != nil){ 918 r->timeout = 0; 919 if(r->xmits > 1){ 920 r->xmits = 1; 921 relrexmit(c, r); 922 } 923 } 924 925 } 926 927 /* no message or input queue full */ 928 if(seq == 0 || qfull(c->rq)) 929 goto out; 930 931 /* refuse out of order delivery */ 932 if(seq != NEXTSEQ(r->rcvseq)){ 933 relsendack(c, r, 0); /* tell him we got it already */ 934 upriv->orders++; 935 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq)); 936 goto out; 937 } 938 r->rcvseq = seq; 939 940 rv = 0; 941 out: 942 relput(r); 943 return rv; 944 } 945 946 void 947 relsendack(Conv *c, Reliable *r, int hangup) 948 { 949 Udphdr *uh; 950 Block *bp; 951 Rudphdr *rh; 952 int ptcllen; 953 Fs *f; 954 955 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE); 956 if(bp == nil) 957 return; 958 bp->wp += UDP_IPHDR + UDP_RHDRSIZE; 959 f = c->p->f; 960 uh = (Udphdr *)(bp->rp); 961 uh->vihl = IP_VER4; 962 rh = (Rudphdr*)uh; 963 964 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE); 965 uh->Unused = 0; 966 uh->udpproto = IP_UDPPROTO; 967 uh->frag[0] = 0; 968 uh->frag[1] = 0; 969 hnputs(uh->udpplen, ptcllen); 970 971 v6tov4(uh->udpdst, r->addr); 972 hnputs(uh->udpdport, r->port); 973 hnputs(uh->udpsport, c->lport); 974 if(ipcmp(c->laddr, IPnoaddr) == 0) 975 findlocalip(f, c->laddr, c->raddr); 976 v6tov4(uh->udpsrc, c->laddr); 977 hnputs(uh->udplen, ptcllen); 978 979 if(hangup) 980 hnputl(rh->relsgen, Hangupgen); 981 else 982 hnputl(rh->relsgen, r->sndgen); 983 hnputl(rh->relseq, 0); 984 hnputl(rh->relagen, r->rcvgen); 985 hnputl(rh->relack, r->rcvseq); 986 987 if(r->acksent < r->rcvseq) 988 r->acksent = r->rcvseq; 989 990 uh->udpcksum[0] = 0; 991 uh->udpcksum[1] = 0; 992 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE)); 993 994 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen); 995 doipoput(c, f, bp, 0, c->ttl, c->tos); 996 } 997 998 999 /* 1000 * called with ucb locked (and c locked if user initiated close) 1001 */ 1002 void 1003 relhangup(Conv *c, Reliable *r) 1004 { 1005 int n; 1006 Block *bp; 1007 char hup[ERRMAX]; 1008 1009 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port); 1010 qproduce(c->eq, hup, n); 1011 1012 /* 1013 * dump any unacked outgoing messages 1014 */ 1015 for(bp = r->unacked; bp != nil; bp = r->unacked){ 1016 r->unacked = bp->list; 1017 bp->list = nil; 1018 freeb(bp); 1019 } 1020 1021 r->rcvgen = 0; 1022 r->rcvseq = 0; 1023 r->acksent = 0; 1024 if(generation == Hangupgen) 1025 generation++; 1026 r->sndgen = generation++; 1027 r->sndseq = 0; 1028 r->ackrcvd = 0; 1029 r->xmits = 0; 1030 r->timeout = 0; 1031 wakeup(&r->vous); 1032 } 1033 1034 /* 1035 * called with ucb locked 1036 */ 1037 void 1038 relrexmit(Conv *c, Reliable *r) 1039 { 1040 Rudppriv *upriv; 1041 Block *np; 1042 Fs *f; 1043 1044 upriv = c->p->priv; 1045 f = c->p->f; 1046 r->timeout = 0; 1047 if(r->xmits++ > Rudpmaxxmit){ 1048 relhangup(c, r); 1049 return; 1050 } 1051 1052 upriv->rxmits++; 1053 np = copyblock(r->unacked, blocklen(r->unacked)); 1054 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1); 1055 doipoput(c, f, np, 0, c->ttl, c->tos); 1056 } 1057