1 /*
2 * Reliable User Datagram Protocol, currently only for IPv4.
3 * This protocol is compatible with UDP's packet format.
4 * It could be done over UDP if need be.
5 */
6 #include "u.h"
7 #include "../port/lib.h"
8 #include "mem.h"
9 #include "dat.h"
10 #include "fns.h"
11 #include "../port/error.h"
12
13 #include "ip.h"
14
15 #define DEBUG 0
16 #define DPRINT if(DEBUG)print
17
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
19 (a)-(b):\
20 0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 ((a)>(start)&&(a)<=(end)):\
23 ((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26
27 enum
28 {
29 UDP_PHDRSIZE = 12, /* pseudo header */
30 // UDP_HDRSIZE = 20, /* pseudo header + udp header */
31 UDP_RHDRSIZE = 36, /* pseudo header + udp header + rudp header */
32 UDP_IPHDR = 8, /* ip header */
33 IP_UDPPROTO = 254,
34 UDP_USEAD7 = 52, /* size of new ipv6 headers struct */
35
36 Rudprxms = 200,
37 Rudptickms = 50,
38 Rudpmaxxmit = 10,
39 Maxunacked = 100,
40 };
41
42 #define Hangupgen 0xffffffff /* used only in hangup messages */
43
44 typedef struct Udphdr Udphdr;
45 struct Udphdr
46 {
47 /* ip header */
48 uchar vihl; /* Version and header length */
49 uchar tos; /* Type of service */
50 uchar length[2]; /* packet length */
51 uchar id[2]; /* Identification */
52 uchar frag[2]; /* Fragment information */
53
54 /* pseudo header starts here */
55 uchar Unused;
56 uchar udpproto; /* Protocol */
57 uchar udpplen[2]; /* Header plus data length */
58 uchar udpsrc[4]; /* Ip source */
59 uchar udpdst[4]; /* Ip destination */
60
61 /* udp header */
62 uchar udpsport[2]; /* Source port */
63 uchar udpdport[2]; /* Destination port */
64 uchar udplen[2]; /* data length */
65 uchar udpcksum[2]; /* Checksum */
66 };
67
68 typedef struct Rudphdr Rudphdr;
69 struct Rudphdr
70 {
71 /* ip header */
72 uchar vihl; /* Version and header length */
73 uchar tos; /* Type of service */
74 uchar length[2]; /* packet length */
75 uchar id[2]; /* Identification */
76 uchar frag[2]; /* Fragment information */
77
78 /* pseudo header starts here */
79 uchar Unused;
80 uchar udpproto; /* Protocol */
81 uchar udpplen[2]; /* Header plus data length */
82 uchar udpsrc[4]; /* Ip source */
83 uchar udpdst[4]; /* Ip destination */
84
85 /* udp header */
86 uchar udpsport[2]; /* Source port */
87 uchar udpdport[2]; /* Destination port */
88 uchar udplen[2]; /* data length (includes rudp header) */
89 uchar udpcksum[2]; /* Checksum */
90
91 /* rudp header */
92 uchar relseq[4]; /* id of this packet (or 0) */
93 uchar relsgen[4]; /* generation/time stamp */
94 uchar relack[4]; /* packet being acked (or 0) */
95 uchar relagen[4]; /* generation/time stamp */
96 };
97
98
99 /*
100 * one state structure per destination
101 */
102 typedef struct Reliable Reliable;
103 struct Reliable
104 {
105 Ref;
106
107 Reliable *next;
108
109 uchar addr[IPaddrlen]; /* always V6 when put here */
110 ushort port;
111
112 Block *unacked; /* unacked msg list */
113 Block *unackedtail; /* and its tail */
114
115 int timeout; /* time since first unacked msg sent */
116 int xmits; /* number of times first unacked msg sent */
117
118 ulong sndseq; /* next packet to be sent */
119 ulong sndgen; /* and its generation */
120
121 ulong rcvseq; /* last packet received */
122 ulong rcvgen; /* and its generation */
123
124 ulong acksent; /* last ack sent */
125 ulong ackrcvd; /* last msg for which ack was rcvd */
126
127 /* flow control */
128 QLock lock;
129 Rendez vous;
130 int blocked;
131 };
132
133
134
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
137 struct Rudpstats
138 {
139 ulong rudpInDatagrams;
140 ulong rudpNoPorts;
141 ulong rudpInErrors;
142 ulong rudpOutDatagrams;
143 };
144
145 typedef struct Rudppriv Rudppriv;
146 struct Rudppriv
147 {
148 Ipht ht;
149
150 /* MIB counters */
151 Rudpstats ustats;
152
153 /* non-MIB stats */
154 ulong csumerr; /* checksum errors */
155 ulong lenerr; /* short packet */
156 ulong rxmits; /* # of retransmissions */
157 ulong orders; /* # of out of order pkts */
158
159 /* keeping track of the ack kproc */
160 int ackprocstarted;
161 QLock apl;
162 };
163
164
165 static ulong generation = 0;
166 static Rendez rend;
167
168 /*
169 * protocol specific part of Conv
170 */
171 typedef struct Rudpcb Rudpcb;
172 struct Rudpcb
173 {
174 QLock;
175 uchar headers;
176 uchar randdrop;
177 Reliable *r;
178 };
179
180 /*
181 * local functions
182 */
183 void relsendack(Conv*, Reliable*, int);
184 int reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void relput(Reliable*);
187 void relforget(Conv *, uchar*, int, int);
188 void relackproc(void *);
189 void relackq(Reliable *, Block*);
190 void relhangup(Conv *, Reliable*);
191 void relrexmit(Conv *, Reliable*);
192 void relput(Reliable*);
193 void rudpkick(void *x);
194
195 static void
rudpstartackproc(Proto * rudp)196 rudpstartackproc(Proto *rudp)
197 {
198 Rudppriv *rpriv;
199 char kpname[KNAMELEN];
200
201 rpriv = rudp->priv;
202 if(rpriv->ackprocstarted == 0){
203 qlock(&rpriv->apl);
204 if(rpriv->ackprocstarted == 0){
205 snprint(kpname, sizeof kpname, "#I%drudpack",
206 rudp->f->dev);
207 kproc(kpname, relackproc, rudp);
208 rpriv->ackprocstarted = 1;
209 }
210 qunlock(&rpriv->apl);
211 }
212 }
213
214 static char*
rudpconnect(Conv * c,char ** argv,int argc)215 rudpconnect(Conv *c, char **argv, int argc)
216 {
217 char *e;
218 Rudppriv *upriv;
219
220 upriv = c->p->priv;
221 rudpstartackproc(c->p);
222 e = Fsstdconnect(c, argv, argc);
223 Fsconnected(c, e);
224 iphtadd(&upriv->ht, c);
225
226 return e;
227 }
228
229
230 static int
rudpstate(Conv * c,char * state,int n)231 rudpstate(Conv *c, char *state, int n)
232 {
233 Rudpcb *ucb;
234 Reliable *r;
235 int m;
236
237 m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
238 ucb = (Rudpcb*)c->ptcl;
239 qlock(ucb);
240 for(r = ucb->r; r; r = r->next)
241 m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
242 m += snprint(state+m, n-m, "\n");
243 qunlock(ucb);
244 return m;
245 }
246
247 static char*
rudpannounce(Conv * c,char ** argv,int argc)248 rudpannounce(Conv *c, char** argv, int argc)
249 {
250 char *e;
251 Rudppriv *upriv;
252
253 upriv = c->p->priv;
254 rudpstartackproc(c->p);
255 e = Fsstdannounce(c, argv, argc);
256 if(e != nil)
257 return e;
258 Fsconnected(c, nil);
259 iphtadd(&upriv->ht, c);
260
261 return nil;
262 }
263
264 static void
rudpcreate(Conv * c)265 rudpcreate(Conv *c)
266 {
267 c->rq = qopen(64*1024, Qmsg, 0, 0);
268 c->wq = qopen(64*1024, Qkick, rudpkick, c);
269 }
270
271 static void
rudpclose(Conv * c)272 rudpclose(Conv *c)
273 {
274 Rudpcb *ucb;
275 Reliable *r, *nr;
276 Rudppriv *upriv;
277
278 upriv = c->p->priv;
279 iphtrem(&upriv->ht, c);
280
281 /* force out any delayed acks */
282 ucb = (Rudpcb*)c->ptcl;
283 qlock(ucb);
284 for(r = ucb->r; r; r = r->next){
285 if(r->acksent != r->rcvseq)
286 relsendack(c, r, 0);
287 }
288 qunlock(ucb);
289
290 qclose(c->rq);
291 qclose(c->wq);
292 qclose(c->eq);
293 ipmove(c->laddr, IPnoaddr);
294 ipmove(c->raddr, IPnoaddr);
295 c->lport = 0;
296 c->rport = 0;
297
298 ucb->headers = 0;
299 ucb->randdrop = 0;
300 qlock(ucb);
301 for(r = ucb->r; r; r = nr){
302 if(r->acksent != r->rcvseq)
303 relsendack(c, r, 0);
304 nr = r->next;
305 relhangup(c, r);
306 relput(r);
307 }
308 ucb->r = 0;
309
310 qunlock(ucb);
311 }
312
313 /*
314 * randomly don't send packets
315 */
316 static void
doipoput(Conv * c,Fs * f,Block * bp,int x,int ttl,int tos)317 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
318 {
319 Rudpcb *ucb;
320
321 ucb = (Rudpcb*)c->ptcl;
322 if(ucb->randdrop && nrand(100) < ucb->randdrop)
323 freeblist(bp);
324 else
325 ipoput4(f, bp, x, ttl, tos, nil);
326 }
327
328 int
flow(void * v)329 flow(void *v)
330 {
331 Reliable *r = v;
332
333 return UNACKED(r) <= Maxunacked;
334 }
335
336 void
rudpkick(void * x)337 rudpkick(void *x)
338 {
339 Conv *c = x;
340 Udphdr *uh;
341 ushort rport;
342 uchar laddr[IPaddrlen], raddr[IPaddrlen];
343 Block *bp;
344 Rudpcb *ucb;
345 Rudphdr *rh;
346 Reliable *r;
347 int dlen, ptcllen;
348 Rudppriv *upriv;
349 Fs *f;
350
351 upriv = c->p->priv;
352 f = c->p->f;
353
354 netlog(c->p->f, Logrudp, "rudp: kick\n");
355 bp = qget(c->wq);
356 if(bp == nil)
357 return;
358
359 ucb = (Rudpcb*)c->ptcl;
360 switch(ucb->headers) {
361 case 7:
362 /* get user specified addresses */
363 bp = pullupblock(bp, UDP_USEAD7);
364 if(bp == nil)
365 return;
366 ipmove(raddr, bp->rp);
367 bp->rp += IPaddrlen;
368 ipmove(laddr, bp->rp);
369 bp->rp += IPaddrlen;
370 /* pick interface closest to dest */
371 if(ipforme(f, laddr) != Runi)
372 findlocalip(f, laddr, raddr);
373 bp->rp += IPaddrlen; /* Ignore ifc address */
374 rport = nhgets(bp->rp);
375 bp->rp += 2+2; /* Ignore local port */
376 break;
377 default:
378 ipmove(raddr, c->raddr);
379 ipmove(laddr, c->laddr);
380 rport = c->rport;
381 break;
382 }
383
384 dlen = blocklen(bp);
385
386 /* Make space to fit rudp & ip header */
387 bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
388 if(bp == nil)
389 return;
390
391 uh = (Udphdr *)(bp->rp);
392 uh->vihl = IP_VER4;
393
394 rh = (Rudphdr*)uh;
395
396 ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
397 uh->Unused = 0;
398 uh->udpproto = IP_UDPPROTO;
399 uh->frag[0] = 0;
400 uh->frag[1] = 0;
401 hnputs(uh->udpplen, ptcllen);
402 switch(ucb->headers){
403 case 7:
404 v6tov4(uh->udpdst, raddr);
405 hnputs(uh->udpdport, rport);
406 v6tov4(uh->udpsrc, laddr);
407 break;
408 default:
409 v6tov4(uh->udpdst, c->raddr);
410 hnputs(uh->udpdport, c->rport);
411 if(ipcmp(c->laddr, IPnoaddr) == 0)
412 findlocalip(f, c->laddr, c->raddr);
413 v6tov4(uh->udpsrc, c->laddr);
414 break;
415 }
416 hnputs(uh->udpsport, c->lport);
417 hnputs(uh->udplen, ptcllen);
418 uh->udpcksum[0] = 0;
419 uh->udpcksum[1] = 0;
420
421 qlock(ucb);
422 r = relstate(ucb, raddr, rport, "kick");
423 r->sndseq = NEXTSEQ(r->sndseq);
424 hnputl(rh->relseq, r->sndseq);
425 hnputl(rh->relsgen, r->sndgen);
426
427 hnputl(rh->relack, r->rcvseq); /* ACK last rcvd packet */
428 hnputl(rh->relagen, r->rcvgen);
429
430 if(r->rcvseq != r->acksent)
431 r->acksent = r->rcvseq;
432
433 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
434
435 relackq(r, bp);
436 qunlock(ucb);
437
438 upriv->ustats.rudpOutDatagrams++;
439
440 DPRINT("sent: %lud/%lud, %lud/%lud\n",
441 r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
442
443 doipoput(c, f, bp, 0, c->ttl, c->tos);
444
445 if(waserror()) {
446 relput(r);
447 qunlock(&r->lock);
448 nexterror();
449 }
450
451 /* flow control of sorts */
452 qlock(&r->lock);
453 if(UNACKED(r) > Maxunacked){
454 r->blocked = 1;
455 sleep(&r->vous, flow, r);
456 r->blocked = 0;
457 }
458
459 qunlock(&r->lock);
460 relput(r);
461 poperror();
462 }
463
464 void
rudpiput(Proto * rudp,Ipifc * ifc,Block * bp)465 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
466 {
467 int len, olen, ottl;
468 Udphdr *uh;
469 Conv *c;
470 Rudpcb *ucb;
471 uchar raddr[IPaddrlen], laddr[IPaddrlen];
472 ushort rport, lport;
473 Rudppriv *upriv;
474 Fs *f;
475 uchar *p;
476
477 upriv = rudp->priv;
478 f = rudp->f;
479
480 upriv->ustats.rudpInDatagrams++;
481
482 uh = (Udphdr*)(bp->rp);
483
484 /* Put back pseudo header for checksum
485 * (remember old values for icmpnoconv())
486 */
487 ottl = uh->Unused;
488 uh->Unused = 0;
489 len = nhgets(uh->udplen);
490 olen = nhgets(uh->udpplen);
491 hnputs(uh->udpplen, len);
492
493 v4tov6(raddr, uh->udpsrc);
494 v4tov6(laddr, uh->udpdst);
495 lport = nhgets(uh->udpdport);
496 rport = nhgets(uh->udpsport);
497
498 if(nhgets(uh->udpcksum)) {
499 if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
500 upriv->ustats.rudpInErrors++;
501 upriv->csumerr++;
502 netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
503 DPRINT("rudp: checksum error %I\n", raddr);
504 freeblist(bp);
505 return;
506 }
507 }
508
509 qlock(rudp);
510
511 c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
512 if(c == nil){
513 /* no conversation found */
514 upriv->ustats.rudpNoPorts++;
515 qunlock(rudp);
516 netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
517 laddr, lport);
518 uh->Unused = ottl;
519 hnputs(uh->udpplen, olen);
520 icmpnoconv(f, bp);
521 freeblist(bp);
522 return;
523 }
524 ucb = (Rudpcb*)c->ptcl;
525 qlock(ucb);
526 qunlock(rudp);
527
528 if(reliput(c, bp, raddr, rport) < 0){
529 qunlock(ucb);
530 freeb(bp);
531 return;
532 }
533
534 /*
535 * Trim the packet down to data size
536 */
537
538 len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
539 bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
540 if(bp == nil) {
541 netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
542 raddr, rport, laddr, lport);
543 DPRINT("rudp: len err %I.%d -> %I.%d\n",
544 raddr, rport, laddr, lport);
545 upriv->lenerr++;
546 return;
547 }
548
549 netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
550 raddr, rport, laddr, lport, len);
551
552 switch(ucb->headers){
553 case 7:
554 /* pass the src address */
555 bp = padblock(bp, UDP_USEAD7);
556 p = bp->rp;
557 ipmove(p, raddr); p += IPaddrlen;
558 ipmove(p, laddr); p += IPaddrlen;
559 ipmove(p, ifc->lifc->local); p += IPaddrlen;
560 hnputs(p, rport); p += 2;
561 hnputs(p, lport);
562 break;
563 default:
564 /* connection oriented rudp */
565 if(ipcmp(c->raddr, IPnoaddr) == 0){
566 /* save the src address in the conversation */
567 ipmove(c->raddr, raddr);
568 c->rport = rport;
569
570 /* reply with the same ip address (if not broadcast) */
571 if(ipforme(f, laddr) == Runi)
572 ipmove(c->laddr, laddr);
573 else
574 v4tov6(c->laddr, ifc->lifc->local);
575 }
576 break;
577 }
578 if(bp->next)
579 bp = concatblock(bp);
580
581 if(qfull(c->rq)) {
582 netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
583 laddr, lport);
584 freeblist(bp);
585 }
586 else
587 qpass(c->rq, bp);
588
589 qunlock(ucb);
590 }
591
592 static char *rudpunknown = "unknown rudp ctl request";
593
594 char*
rudpctl(Conv * c,char ** f,int n)595 rudpctl(Conv *c, char **f, int n)
596 {
597 Rudpcb *ucb;
598 uchar ip[IPaddrlen];
599 int x;
600
601 ucb = (Rudpcb*)c->ptcl;
602 if(n < 1)
603 return rudpunknown;
604
605 if(strcmp(f[0], "headers") == 0){
606 ucb->headers = 7; /* new headers format */
607 return nil;
608 } else if(strcmp(f[0], "hangup") == 0){
609 if(n < 3)
610 return "bad syntax";
611 if (parseip(ip, f[1]) == -1)
612 return Ebadip;
613 x = atoi(f[2]);
614 qlock(ucb);
615 relforget(c, ip, x, 1);
616 qunlock(ucb);
617 return nil;
618 } else if(strcmp(f[0], "randdrop") == 0){
619 x = 10; /* default is 10% */
620 if(n > 1)
621 x = atoi(f[1]);
622 if(x > 100 || x < 0)
623 return "illegal rudp drop rate";
624 ucb->randdrop = x;
625 return nil;
626 }
627 return rudpunknown;
628 }
629
630 void
rudpadvise(Proto * rudp,Block * bp,char * msg)631 rudpadvise(Proto *rudp, Block *bp, char *msg)
632 {
633 Udphdr *h;
634 uchar source[IPaddrlen], dest[IPaddrlen];
635 ushort psource, pdest;
636 Conv *s, **p;
637
638 h = (Udphdr*)(bp->rp);
639
640 v4tov6(dest, h->udpdst);
641 v4tov6(source, h->udpsrc);
642 psource = nhgets(h->udpsport);
643 pdest = nhgets(h->udpdport);
644
645 /* Look for a connection */
646 for(p = rudp->conv; *p; p++) {
647 s = *p;
648 if(s->rport == pdest)
649 if(s->lport == psource)
650 if(ipcmp(s->raddr, dest) == 0)
651 if(ipcmp(s->laddr, source) == 0){
652 qhangup(s->rq, msg);
653 qhangup(s->wq, msg);
654 break;
655 }
656 }
657 freeblist(bp);
658 }
659
660 int
rudpstats(Proto * rudp,char * buf,int len)661 rudpstats(Proto *rudp, char *buf, int len)
662 {
663 Rudppriv *upriv;
664
665 upriv = rudp->priv;
666 return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
667 upriv->ustats.rudpInDatagrams,
668 upriv->ustats.rudpNoPorts,
669 upriv->ustats.rudpInErrors,
670 upriv->ustats.rudpOutDatagrams,
671 upriv->rxmits,
672 upriv->orders);
673 }
674
675 void
rudpinit(Fs * fs)676 rudpinit(Fs *fs)
677 {
678
679 Proto *rudp;
680
681 rudp = smalloc(sizeof(Proto));
682 rudp->priv = smalloc(sizeof(Rudppriv));
683 rudp->name = "rudp";
684 rudp->connect = rudpconnect;
685 rudp->announce = rudpannounce;
686 rudp->ctl = rudpctl;
687 rudp->state = rudpstate;
688 rudp->create = rudpcreate;
689 rudp->close = rudpclose;
690 rudp->rcv = rudpiput;
691 rudp->advise = rudpadvise;
692 rudp->stats = rudpstats;
693 rudp->ipproto = IP_UDPPROTO;
694 rudp->nc = 32;
695 rudp->ptclsize = sizeof(Rudpcb);
696
697 Fsproto(fs, rudp);
698 }
699
700 /*********************************************/
701 /* Here starts the reliable helper functions */
702 /*********************************************/
703 /*
704 * Enqueue a copy of an unacked block for possible retransmissions
705 */
706 void
relackq(Reliable * r,Block * bp)707 relackq(Reliable *r, Block *bp)
708 {
709 Block *np;
710
711 np = copyblock(bp, blocklen(bp));
712 if(r->unacked)
713 r->unackedtail->list = np;
714 else {
715 /* restart timer */
716 r->timeout = 0;
717 r->xmits = 1;
718 r->unacked = np;
719 }
720 r->unackedtail = np;
721 np->list = nil;
722 }
723
724 /*
725 * retransmit unacked blocks
726 */
727 void
relackproc(void * a)728 relackproc(void *a)
729 {
730 Rudpcb *ucb;
731 Proto *rudp;
732 Reliable *r;
733 Conv **s, *c;
734
735 rudp = (Proto *)a;
736
737 loop:
738 tsleep(&up->sleep, return0, 0, Rudptickms);
739
740 for(s = rudp->conv; *s; s++) {
741 c = *s;
742 ucb = (Rudpcb*)c->ptcl;
743 qlock(ucb);
744
745 for(r = ucb->r; r; r = r->next) {
746 if(r->unacked != nil){
747 r->timeout += Rudptickms;
748 if(r->timeout > Rudprxms*r->xmits)
749 relrexmit(c, r);
750 }
751 if(r->acksent != r->rcvseq)
752 relsendack(c, r, 0);
753 }
754 qunlock(ucb);
755 }
756 goto loop;
757 }
758
759 /*
760 * get the state record for a conversation
761 */
762 Reliable*
relstate(Rudpcb * ucb,uchar * addr,ushort port,char * from)763 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
764 {
765 Reliable *r, **l;
766
767 l = &ucb->r;
768 for(r = *l; r; r = *l){
769 if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
770 port == r->port)
771 break;
772 l = &r->next;
773 }
774
775 /* no state for this addr/port, create some */
776 if(r == nil){
777 while(generation == 0)
778 generation = rand();
779
780 DPRINT("from %s new state %lud for %I!%ud\n",
781 from, generation, addr, port);
782
783 r = smalloc(sizeof(Reliable));
784 memmove(r->addr, addr, IPaddrlen);
785 r->port = port;
786 r->unacked = 0;
787 if(generation == Hangupgen)
788 generation++;
789 r->sndgen = generation++;
790 r->sndseq = 0;
791 r->ackrcvd = 0;
792 r->rcvgen = 0;
793 r->rcvseq = 0;
794 r->acksent = 0;
795 r->xmits = 0;
796 r->timeout = 0;
797 r->ref = 0;
798 incref(r); /* one reference for being in the list */
799
800 *l = r;
801 }
802
803 incref(r);
804 return r;
805 }
806
807 void
relput(Reliable * r)808 relput(Reliable *r)
809 {
810 if(decref(r) == 0)
811 free(r);
812 }
813
814 /*
815 * forget a Reliable state
816 */
817 void
relforget(Conv * c,uchar * ip,int port,int originator)818 relforget(Conv *c, uchar *ip, int port, int originator)
819 {
820 Rudpcb *ucb;
821 Reliable *r, **l;
822
823 ucb = (Rudpcb*)c->ptcl;
824
825 l = &ucb->r;
826 for(r = *l; r; r = *l){
827 if(ipcmp(ip, r->addr) == 0 && port == r->port){
828 *l = r->next;
829 if(originator)
830 relsendack(c, r, 1);
831 relhangup(c, r);
832 relput(r); /* remove from the list */
833 break;
834 }
835 l = &r->next;
836 }
837 }
838
839 /*
840 * process a rcvd reliable packet. return -1 if not to be passed to user process,
841 * 0 therwise.
842 *
843 * called with ucb locked.
844 */
845 int
reliput(Conv * c,Block * bp,uchar * addr,ushort port)846 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
847 {
848 Block *nbp;
849 Rudpcb *ucb;
850 Rudppriv *upriv;
851 Udphdr *uh;
852 Reliable *r;
853 Rudphdr *rh;
854 ulong seq, ack, sgen, agen, ackreal;
855 int rv = -1;
856
857 /* get fields */
858 uh = (Udphdr*)(bp->rp);
859 rh = (Rudphdr*)uh;
860 seq = nhgetl(rh->relseq);
861 sgen = nhgetl(rh->relsgen);
862 ack = nhgetl(rh->relack);
863 agen = nhgetl(rh->relagen);
864
865 upriv = c->p->priv;
866 ucb = (Rudpcb*)c->ptcl;
867 r = relstate(ucb, addr, port, "input");
868
869 DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
870 seq, sgen, ack, agen, r->sndgen);
871
872 /* if acking an incorrect generation, ignore */
873 if(ack && agen != r->sndgen)
874 goto out;
875
876 /* Look for a hangup */
877 if(sgen == Hangupgen) {
878 if(agen == r->sndgen)
879 relforget(c, addr, port, 0);
880 goto out;
881 }
882
883 /* make sure we're not talking to a new remote side */
884 if(r->rcvgen != sgen){
885 if(seq != 0 && seq != 1)
886 goto out;
887
888 /* new connection */
889 if(r->rcvgen != 0){
890 DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
891 relhangup(c, r);
892 }
893 r->rcvgen = sgen;
894 }
895
896 /* dequeue acked packets */
897 if(ack && agen == r->sndgen){
898 ackreal = 0;
899 while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
900 nbp = r->unacked;
901 r->unacked = nbp->list;
902 DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
903 ack, agen, r->sndgen);
904 freeb(nbp);
905 r->ackrcvd = NEXTSEQ(r->ackrcvd);
906 ackreal = 1;
907 }
908
909 /* flow control */
910 if(UNACKED(r) < Maxunacked/8 && r->blocked)
911 wakeup(&r->vous);
912
913 /*
914 * retransmit next packet if the acked packet
915 * was transmitted more than once
916 */
917 if(ackreal && r->unacked != nil){
918 r->timeout = 0;
919 if(r->xmits > 1){
920 r->xmits = 1;
921 relrexmit(c, r);
922 }
923 }
924
925 }
926
927 /* no message or input queue full */
928 if(seq == 0 || qfull(c->rq))
929 goto out;
930
931 /* refuse out of order delivery */
932 if(seq != NEXTSEQ(r->rcvseq)){
933 relsendack(c, r, 0); /* tell him we got it already */
934 upriv->orders++;
935 DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
936 goto out;
937 }
938 r->rcvseq = seq;
939
940 rv = 0;
941 out:
942 relput(r);
943 return rv;
944 }
945
946 void
relsendack(Conv * c,Reliable * r,int hangup)947 relsendack(Conv *c, Reliable *r, int hangup)
948 {
949 Udphdr *uh;
950 Block *bp;
951 Rudphdr *rh;
952 int ptcllen;
953 Fs *f;
954
955 bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
956 if(bp == nil)
957 return;
958 bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
959 f = c->p->f;
960 uh = (Udphdr *)(bp->rp);
961 uh->vihl = IP_VER4;
962 rh = (Rudphdr*)uh;
963
964 ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
965 uh->Unused = 0;
966 uh->udpproto = IP_UDPPROTO;
967 uh->frag[0] = 0;
968 uh->frag[1] = 0;
969 hnputs(uh->udpplen, ptcllen);
970
971 v6tov4(uh->udpdst, r->addr);
972 hnputs(uh->udpdport, r->port);
973 hnputs(uh->udpsport, c->lport);
974 if(ipcmp(c->laddr, IPnoaddr) == 0)
975 findlocalip(f, c->laddr, c->raddr);
976 v6tov4(uh->udpsrc, c->laddr);
977 hnputs(uh->udplen, ptcllen);
978
979 if(hangup)
980 hnputl(rh->relsgen, Hangupgen);
981 else
982 hnputl(rh->relsgen, r->sndgen);
983 hnputl(rh->relseq, 0);
984 hnputl(rh->relagen, r->rcvgen);
985 hnputl(rh->relack, r->rcvseq);
986
987 if(r->acksent < r->rcvseq)
988 r->acksent = r->rcvseq;
989
990 uh->udpcksum[0] = 0;
991 uh->udpcksum[1] = 0;
992 hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
993
994 DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
995 doipoput(c, f, bp, 0, c->ttl, c->tos);
996 }
997
998
999 /*
1000 * called with ucb locked (and c locked if user initiated close)
1001 */
1002 void
relhangup(Conv * c,Reliable * r)1003 relhangup(Conv *c, Reliable *r)
1004 {
1005 int n;
1006 Block *bp;
1007 char hup[ERRMAX];
1008
1009 n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1010 qproduce(c->eq, hup, n);
1011
1012 /*
1013 * dump any unacked outgoing messages
1014 */
1015 for(bp = r->unacked; bp != nil; bp = r->unacked){
1016 r->unacked = bp->list;
1017 bp->list = nil;
1018 freeb(bp);
1019 }
1020
1021 r->rcvgen = 0;
1022 r->rcvseq = 0;
1023 r->acksent = 0;
1024 if(generation == Hangupgen)
1025 generation++;
1026 r->sndgen = generation++;
1027 r->sndseq = 0;
1028 r->ackrcvd = 0;
1029 r->xmits = 0;
1030 r->timeout = 0;
1031 wakeup(&r->vous);
1032 }
1033
1034 /*
1035 * called with ucb locked
1036 */
1037 void
relrexmit(Conv * c,Reliable * r)1038 relrexmit(Conv *c, Reliable *r)
1039 {
1040 Rudppriv *upriv;
1041 Block *np;
1042 Fs *f;
1043
1044 upriv = c->p->priv;
1045 f = c->p->f;
1046 r->timeout = 0;
1047 if(r->xmits++ > Rudpmaxxmit){
1048 relhangup(c, r);
1049 return;
1050 }
1051
1052 upriv->rxmits++;
1053 np = copyblock(r->unacked, blocklen(r->unacked));
1054 DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1055 doipoput(c, f, np, 0, c->ttl, c->tos);
1056 }
1057