xref: /plan9/sys/src/9/ip/rudp.c (revision 4e3613ab15c331a9ada113286cc0f2a35bc0373d)
1 /*
2  *  Reliable User Datagram Protocol, currently only for IPv4.
3  *  This protocol is compatible with UDP's packet format.
4  *  It could be done over UDP if need be.
5  */
6 #include	"u.h"
7 #include	"../port/lib.h"
8 #include	"mem.h"
9 #include	"dat.h"
10 #include	"fns.h"
11 #include	"../port/error.h"
12 
13 #include	"ip.h"
14 
15 #define DEBUG	0
16 #define DPRINT if(DEBUG)print
17 
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
19 			(a)-(b):\
20 			0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 				((a)>(start)&&(a)<=(end)):\
23 				((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26 
27 enum
28 {
29 	UDP_PHDRSIZE	= 12,	/* pseudo header */
30 //	UDP_HDRSIZE	= 20,	/* pseudo header + udp header */
31 	UDP_RHDRSIZE	= 36,	/* pseudo header + udp header + rudp header */
32 	UDP_IPHDR	= 8,	/* ip header */
33 	IP_UDPPROTO	= 254,
34 	UDP_USEAD7	= 52,	/* size of new ipv6 headers struct */
35 
36 	Rudprxms	= 200,
37 	Rudptickms	= 50,
38 	Rudpmaxxmit	= 10,
39 	Maxunacked	= 100,
40 };
41 
42 #define Hangupgen	0xffffffff	/* used only in hangup messages */
43 
44 typedef struct Udphdr Udphdr;
45 struct Udphdr
46 {
47 	/* ip header */
48 	uchar	vihl;		/* Version and header length */
49 	uchar	tos;		/* Type of service */
50 	uchar	length[2];	/* packet length */
51 	uchar	id[2];		/* Identification */
52 	uchar	frag[2];	/* Fragment information */
53 
54 	/* pseudo header starts here */
55 	uchar	Unused;
56 	uchar	udpproto;	/* Protocol */
57 	uchar	udpplen[2];	/* Header plus data length */
58 	uchar	udpsrc[4];	/* Ip source */
59 	uchar	udpdst[4];	/* Ip destination */
60 
61 	/* udp header */
62 	uchar	udpsport[2];	/* Source port */
63 	uchar	udpdport[2];	/* Destination port */
64 	uchar	udplen[2];	/* data length */
65 	uchar	udpcksum[2];	/* Checksum */
66 };
67 
68 typedef struct Rudphdr Rudphdr;
69 struct Rudphdr
70 {
71 	/* ip header */
72 	uchar	vihl;		/* Version and header length */
73 	uchar	tos;		/* Type of service */
74 	uchar	length[2];	/* packet length */
75 	uchar	id[2];		/* Identification */
76 	uchar	frag[2];	/* Fragment information */
77 
78 	/* pseudo header starts here */
79 	uchar	Unused;
80 	uchar	udpproto;	/* Protocol */
81 	uchar	udpplen[2];	/* Header plus data length */
82 	uchar	udpsrc[4];	/* Ip source */
83 	uchar	udpdst[4];	/* Ip destination */
84 
85 	/* udp header */
86 	uchar	udpsport[2];	/* Source port */
87 	uchar	udpdport[2];	/* Destination port */
88 	uchar	udplen[2];	/* data length (includes rudp header) */
89 	uchar	udpcksum[2];	/* Checksum */
90 
91 	/* rudp header */
92 	uchar	relseq[4];	/* id of this packet (or 0) */
93 	uchar	relsgen[4];	/* generation/time stamp */
94 	uchar	relack[4];	/* packet being acked (or 0) */
95 	uchar	relagen[4];	/* generation/time stamp */
96 };
97 
98 
99 /*
100  *  one state structure per destination
101  */
102 typedef struct Reliable Reliable;
103 struct Reliable
104 {
105 	Ref;
106 
107 	Reliable *next;
108 
109 	uchar	addr[IPaddrlen];	/* always V6 when put here */
110 	ushort	port;
111 
112 	Block	*unacked;	/* unacked msg list */
113 	Block	*unackedtail;	/*  and its tail */
114 
115 	int	timeout;	/* time since first unacked msg sent */
116 	int	xmits;		/* number of times first unacked msg sent */
117 
118 	ulong	sndseq;		/* next packet to be sent */
119 	ulong	sndgen;		/*  and its generation */
120 
121 	ulong	rcvseq;		/* last packet received */
122 	ulong	rcvgen;		/*  and its generation */
123 
124 	ulong	acksent;	/* last ack sent */
125 	ulong	ackrcvd;	/* last msg for which ack was rcvd */
126 
127 	/* flow control */
128 	QLock	lock;
129 	Rendez	vous;
130 	int	blocked;
131 };
132 
133 
134 
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
137 struct Rudpstats
138 {
139 	ulong	rudpInDatagrams;
140 	ulong	rudpNoPorts;
141 	ulong	rudpInErrors;
142 	ulong	rudpOutDatagrams;
143 };
144 
145 typedef struct Rudppriv Rudppriv;
146 struct Rudppriv
147 {
148 	Ipht	ht;
149 
150 	/* MIB counters */
151 	Rudpstats	ustats;
152 
153 	/* non-MIB stats */
154 	ulong	csumerr;		/* checksum errors */
155 	ulong	lenerr;			/* short packet */
156 	ulong	rxmits;			/* # of retransmissions */
157 	ulong	orders;			/* # of out of order pkts */
158 
159 	/* keeping track of the ack kproc */
160 	int	ackprocstarted;
161 	QLock	apl;
162 };
163 
164 
165 static ulong generation = 0;
166 static Rendez rend;
167 
168 /*
169  *  protocol specific part of Conv
170  */
171 typedef struct Rudpcb Rudpcb;
172 struct Rudpcb
173 {
174 	QLock;
175 	uchar	headers;
176 	uchar	randdrop;
177 	Reliable *r;
178 };
179 
180 /*
181  * local functions
182  */
183 void	relsendack(Conv*, Reliable*, int);
184 int	reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void	relput(Reliable*);
187 void	relforget(Conv *, uchar*, int, int);
188 void	relackproc(void *);
189 void	relackq(Reliable *, Block*);
190 void	relhangup(Conv *, Reliable*);
191 void	relrexmit(Conv *, Reliable*);
192 void	relput(Reliable*);
193 void	rudpkick(void *x);
194 
195 static void
rudpstartackproc(Proto * rudp)196 rudpstartackproc(Proto *rudp)
197 {
198 	Rudppriv *rpriv;
199 	char kpname[KNAMELEN];
200 
201 	rpriv = rudp->priv;
202 	if(rpriv->ackprocstarted == 0){
203 		qlock(&rpriv->apl);
204 		if(rpriv->ackprocstarted == 0){
205 			snprint(kpname, sizeof kpname, "#I%drudpack",
206 				rudp->f->dev);
207 			kproc(kpname, relackproc, rudp);
208 			rpriv->ackprocstarted = 1;
209 		}
210 		qunlock(&rpriv->apl);
211 	}
212 }
213 
214 static char*
rudpconnect(Conv * c,char ** argv,int argc)215 rudpconnect(Conv *c, char **argv, int argc)
216 {
217 	char *e;
218 	Rudppriv *upriv;
219 
220 	upriv = c->p->priv;
221 	rudpstartackproc(c->p);
222 	e = Fsstdconnect(c, argv, argc);
223 	Fsconnected(c, e);
224 	iphtadd(&upriv->ht, c);
225 
226 	return e;
227 }
228 
229 
230 static int
rudpstate(Conv * c,char * state,int n)231 rudpstate(Conv *c, char *state, int n)
232 {
233 	Rudpcb *ucb;
234 	Reliable *r;
235 	int m;
236 
237 	m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
238 	ucb = (Rudpcb*)c->ptcl;
239 	qlock(ucb);
240 	for(r = ucb->r; r; r = r->next)
241 		m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
242 	m += snprint(state+m, n-m, "\n");
243 	qunlock(ucb);
244 	return m;
245 }
246 
247 static char*
rudpannounce(Conv * c,char ** argv,int argc)248 rudpannounce(Conv *c, char** argv, int argc)
249 {
250 	char *e;
251 	Rudppriv *upriv;
252 
253 	upriv = c->p->priv;
254 	rudpstartackproc(c->p);
255 	e = Fsstdannounce(c, argv, argc);
256 	if(e != nil)
257 		return e;
258 	Fsconnected(c, nil);
259 	iphtadd(&upriv->ht, c);
260 
261 	return nil;
262 }
263 
264 static void
rudpcreate(Conv * c)265 rudpcreate(Conv *c)
266 {
267 	c->rq = qopen(64*1024, Qmsg, 0, 0);
268 	c->wq = qopen(64*1024, Qkick, rudpkick, c);
269 }
270 
271 static void
rudpclose(Conv * c)272 rudpclose(Conv *c)
273 {
274 	Rudpcb *ucb;
275 	Reliable *r, *nr;
276 	Rudppriv *upriv;
277 
278 	upriv = c->p->priv;
279 	iphtrem(&upriv->ht, c);
280 
281 	/* force out any delayed acks */
282 	ucb = (Rudpcb*)c->ptcl;
283 	qlock(ucb);
284 	for(r = ucb->r; r; r = r->next){
285 		if(r->acksent != r->rcvseq)
286 			relsendack(c, r, 0);
287 	}
288 	qunlock(ucb);
289 
290 	qclose(c->rq);
291 	qclose(c->wq);
292 	qclose(c->eq);
293 	ipmove(c->laddr, IPnoaddr);
294 	ipmove(c->raddr, IPnoaddr);
295 	c->lport = 0;
296 	c->rport = 0;
297 
298 	ucb->headers = 0;
299 	ucb->randdrop = 0;
300 	qlock(ucb);
301 	for(r = ucb->r; r; r = nr){
302 		if(r->acksent != r->rcvseq)
303 			relsendack(c, r, 0);
304 		nr = r->next;
305 		relhangup(c, r);
306 		relput(r);
307 	}
308 	ucb->r = 0;
309 
310 	qunlock(ucb);
311 }
312 
313 /*
314  *  randomly don't send packets
315  */
316 static void
doipoput(Conv * c,Fs * f,Block * bp,int x,int ttl,int tos)317 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
318 {
319 	Rudpcb *ucb;
320 
321 	ucb = (Rudpcb*)c->ptcl;
322 	if(ucb->randdrop && nrand(100) < ucb->randdrop)
323 		freeblist(bp);
324 	else
325 		ipoput4(f, bp, x, ttl, tos, nil);
326 }
327 
328 int
flow(void * v)329 flow(void *v)
330 {
331 	Reliable *r = v;
332 
333 	return UNACKED(r) <= Maxunacked;
334 }
335 
336 void
rudpkick(void * x)337 rudpkick(void *x)
338 {
339 	Conv *c = x;
340 	Udphdr *uh;
341 	ushort rport;
342 	uchar laddr[IPaddrlen], raddr[IPaddrlen];
343 	Block *bp;
344 	Rudpcb *ucb;
345 	Rudphdr *rh;
346 	Reliable *r;
347 	int dlen, ptcllen;
348 	Rudppriv *upriv;
349 	Fs *f;
350 
351 	upriv = c->p->priv;
352 	f = c->p->f;
353 
354 	netlog(c->p->f, Logrudp, "rudp: kick\n");
355 	bp = qget(c->wq);
356 	if(bp == nil)
357 		return;
358 
359 	ucb = (Rudpcb*)c->ptcl;
360 	switch(ucb->headers) {
361 	case 7:
362 		/* get user specified addresses */
363 		bp = pullupblock(bp, UDP_USEAD7);
364 		if(bp == nil)
365 			return;
366 		ipmove(raddr, bp->rp);
367 		bp->rp += IPaddrlen;
368 		ipmove(laddr, bp->rp);
369 		bp->rp += IPaddrlen;
370 		/* pick interface closest to dest */
371 		if(ipforme(f, laddr) != Runi)
372 			findlocalip(f, laddr, raddr);
373 		bp->rp += IPaddrlen;		/* Ignore ifc address */
374 		rport = nhgets(bp->rp);
375 		bp->rp += 2+2;			/* Ignore local port */
376 		break;
377 	default:
378 		ipmove(raddr, c->raddr);
379 		ipmove(laddr, c->laddr);
380 		rport = c->rport;
381 		break;
382 	}
383 
384 	dlen = blocklen(bp);
385 
386 	/* Make space to fit rudp & ip header */
387 	bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
388 	if(bp == nil)
389 		return;
390 
391 	uh = (Udphdr *)(bp->rp);
392 	uh->vihl = IP_VER4;
393 
394 	rh = (Rudphdr*)uh;
395 
396 	ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
397 	uh->Unused = 0;
398 	uh->udpproto = IP_UDPPROTO;
399 	uh->frag[0] = 0;
400 	uh->frag[1] = 0;
401 	hnputs(uh->udpplen, ptcllen);
402 	switch(ucb->headers){
403 	case 7:
404 		v6tov4(uh->udpdst, raddr);
405 		hnputs(uh->udpdport, rport);
406 		v6tov4(uh->udpsrc, laddr);
407 		break;
408 	default:
409 		v6tov4(uh->udpdst, c->raddr);
410 		hnputs(uh->udpdport, c->rport);
411 		if(ipcmp(c->laddr, IPnoaddr) == 0)
412 			findlocalip(f, c->laddr, c->raddr);
413 		v6tov4(uh->udpsrc, c->laddr);
414 		break;
415 	}
416 	hnputs(uh->udpsport, c->lport);
417 	hnputs(uh->udplen, ptcllen);
418 	uh->udpcksum[0] = 0;
419 	uh->udpcksum[1] = 0;
420 
421 	qlock(ucb);
422 	r = relstate(ucb, raddr, rport, "kick");
423 	r->sndseq = NEXTSEQ(r->sndseq);
424 	hnputl(rh->relseq, r->sndseq);
425 	hnputl(rh->relsgen, r->sndgen);
426 
427 	hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
428 	hnputl(rh->relagen, r->rcvgen);
429 
430 	if(r->rcvseq != r->acksent)
431 		r->acksent = r->rcvseq;
432 
433 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
434 
435 	relackq(r, bp);
436 	qunlock(ucb);
437 
438 	upriv->ustats.rudpOutDatagrams++;
439 
440 	DPRINT("sent: %lud/%lud, %lud/%lud\n",
441 		r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
442 
443 	doipoput(c, f, bp, 0, c->ttl, c->tos);
444 
445 	if(waserror()) {
446 		relput(r);
447 		qunlock(&r->lock);
448 		nexterror();
449 	}
450 
451 	/* flow control of sorts */
452 	qlock(&r->lock);
453 	if(UNACKED(r) > Maxunacked){
454 		r->blocked = 1;
455 		sleep(&r->vous, flow, r);
456 		r->blocked = 0;
457 	}
458 
459 	qunlock(&r->lock);
460 	relput(r);
461 	poperror();
462 }
463 
464 void
rudpiput(Proto * rudp,Ipifc * ifc,Block * bp)465 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
466 {
467 	int len, olen, ottl;
468 	Udphdr *uh;
469 	Conv *c;
470 	Rudpcb *ucb;
471 	uchar raddr[IPaddrlen], laddr[IPaddrlen];
472 	ushort rport, lport;
473 	Rudppriv *upriv;
474 	Fs *f;
475 	uchar *p;
476 
477 	upriv = rudp->priv;
478 	f = rudp->f;
479 
480 	upriv->ustats.rudpInDatagrams++;
481 
482 	uh = (Udphdr*)(bp->rp);
483 
484 	/* Put back pseudo header for checksum
485 	 * (remember old values for icmpnoconv())
486 	 */
487 	ottl = uh->Unused;
488 	uh->Unused = 0;
489 	len = nhgets(uh->udplen);
490 	olen = nhgets(uh->udpplen);
491 	hnputs(uh->udpplen, len);
492 
493 	v4tov6(raddr, uh->udpsrc);
494 	v4tov6(laddr, uh->udpdst);
495 	lport = nhgets(uh->udpdport);
496 	rport = nhgets(uh->udpsport);
497 
498 	if(nhgets(uh->udpcksum)) {
499 		if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
500 			upriv->ustats.rudpInErrors++;
501 			upriv->csumerr++;
502 			netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
503 			DPRINT("rudp: checksum error %I\n", raddr);
504 			freeblist(bp);
505 			return;
506 		}
507 	}
508 
509 	qlock(rudp);
510 
511 	c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
512 	if(c == nil){
513 		/* no conversation found */
514 		upriv->ustats.rudpNoPorts++;
515 		qunlock(rudp);
516 		netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
517 			laddr, lport);
518 		uh->Unused = ottl;
519 		hnputs(uh->udpplen, olen);
520 		icmpnoconv(f, bp);
521 		freeblist(bp);
522 		return;
523 	}
524 	ucb = (Rudpcb*)c->ptcl;
525 	qlock(ucb);
526 	qunlock(rudp);
527 
528 	if(reliput(c, bp, raddr, rport) < 0){
529 		qunlock(ucb);
530 		freeb(bp);
531 		return;
532 	}
533 
534 	/*
535 	 * Trim the packet down to data size
536 	 */
537 
538 	len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
539 	bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
540 	if(bp == nil) {
541 		netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
542 			raddr, rport, laddr, lport);
543 		DPRINT("rudp: len err %I.%d -> %I.%d\n",
544 			raddr, rport, laddr, lport);
545 		upriv->lenerr++;
546 		return;
547 	}
548 
549 	netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
550 		raddr, rport, laddr, lport, len);
551 
552 	switch(ucb->headers){
553 	case 7:
554 		/* pass the src address */
555 		bp = padblock(bp, UDP_USEAD7);
556 		p = bp->rp;
557 		ipmove(p, raddr); p += IPaddrlen;
558 		ipmove(p, laddr); p += IPaddrlen;
559 		ipmove(p, ifc->lifc->local); p += IPaddrlen;
560 		hnputs(p, rport); p += 2;
561 		hnputs(p, lport);
562 		break;
563 	default:
564 		/* connection oriented rudp */
565 		if(ipcmp(c->raddr, IPnoaddr) == 0){
566 			/* save the src address in the conversation */
567 		 	ipmove(c->raddr, raddr);
568 			c->rport = rport;
569 
570 			/* reply with the same ip address (if not broadcast) */
571 			if(ipforme(f, laddr) == Runi)
572 				ipmove(c->laddr, laddr);
573 			else
574 				v4tov6(c->laddr, ifc->lifc->local);
575 		}
576 		break;
577 	}
578 	if(bp->next)
579 		bp = concatblock(bp);
580 
581 	if(qfull(c->rq)) {
582 		netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
583 			laddr, lport);
584 		freeblist(bp);
585 	}
586 	else
587 		qpass(c->rq, bp);
588 
589 	qunlock(ucb);
590 }
591 
592 static char *rudpunknown = "unknown rudp ctl request";
593 
594 char*
rudpctl(Conv * c,char ** f,int n)595 rudpctl(Conv *c, char **f, int n)
596 {
597 	Rudpcb *ucb;
598 	uchar ip[IPaddrlen];
599 	int x;
600 
601 	ucb = (Rudpcb*)c->ptcl;
602 	if(n < 1)
603 		return rudpunknown;
604 
605 	if(strcmp(f[0], "headers") == 0){
606 		ucb->headers = 7;		/* new headers format */
607 		return nil;
608 	} else if(strcmp(f[0], "hangup") == 0){
609 		if(n < 3)
610 			return "bad syntax";
611 		if (parseip(ip, f[1]) == -1)
612 			return Ebadip;
613 		x = atoi(f[2]);
614 		qlock(ucb);
615 		relforget(c, ip, x, 1);
616 		qunlock(ucb);
617 		return nil;
618 	} else if(strcmp(f[0], "randdrop") == 0){
619 		x = 10;			/* default is 10% */
620 		if(n > 1)
621 			x = atoi(f[1]);
622 		if(x > 100 || x < 0)
623 			return "illegal rudp drop rate";
624 		ucb->randdrop = x;
625 		return nil;
626 	}
627 	return rudpunknown;
628 }
629 
630 void
rudpadvise(Proto * rudp,Block * bp,char * msg)631 rudpadvise(Proto *rudp, Block *bp, char *msg)
632 {
633 	Udphdr *h;
634 	uchar source[IPaddrlen], dest[IPaddrlen];
635 	ushort psource, pdest;
636 	Conv *s, **p;
637 
638 	h = (Udphdr*)(bp->rp);
639 
640 	v4tov6(dest, h->udpdst);
641 	v4tov6(source, h->udpsrc);
642 	psource = nhgets(h->udpsport);
643 	pdest = nhgets(h->udpdport);
644 
645 	/* Look for a connection */
646 	for(p = rudp->conv; *p; p++) {
647 		s = *p;
648 		if(s->rport == pdest)
649 		if(s->lport == psource)
650 		if(ipcmp(s->raddr, dest) == 0)
651 		if(ipcmp(s->laddr, source) == 0){
652 			qhangup(s->rq, msg);
653 			qhangup(s->wq, msg);
654 			break;
655 		}
656 	}
657 	freeblist(bp);
658 }
659 
660 int
rudpstats(Proto * rudp,char * buf,int len)661 rudpstats(Proto *rudp, char *buf, int len)
662 {
663 	Rudppriv *upriv;
664 
665 	upriv = rudp->priv;
666 	return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
667 		upriv->ustats.rudpInDatagrams,
668 		upriv->ustats.rudpNoPorts,
669 		upriv->ustats.rudpInErrors,
670 		upriv->ustats.rudpOutDatagrams,
671 		upriv->rxmits,
672 		upriv->orders);
673 }
674 
675 void
rudpinit(Fs * fs)676 rudpinit(Fs *fs)
677 {
678 
679 	Proto *rudp;
680 
681 	rudp = smalloc(sizeof(Proto));
682 	rudp->priv = smalloc(sizeof(Rudppriv));
683 	rudp->name = "rudp";
684 	rudp->connect = rudpconnect;
685 	rudp->announce = rudpannounce;
686 	rudp->ctl = rudpctl;
687 	rudp->state = rudpstate;
688 	rudp->create = rudpcreate;
689 	rudp->close = rudpclose;
690 	rudp->rcv = rudpiput;
691 	rudp->advise = rudpadvise;
692 	rudp->stats = rudpstats;
693 	rudp->ipproto = IP_UDPPROTO;
694 	rudp->nc = 32;
695 	rudp->ptclsize = sizeof(Rudpcb);
696 
697 	Fsproto(fs, rudp);
698 }
699 
700 /*********************************************/
701 /* Here starts the reliable helper functions */
702 /*********************************************/
703 /*
704  *  Enqueue a copy of an unacked block for possible retransmissions
705  */
706 void
relackq(Reliable * r,Block * bp)707 relackq(Reliable *r, Block *bp)
708 {
709 	Block *np;
710 
711 	np = copyblock(bp, blocklen(bp));
712 	if(r->unacked)
713 		r->unackedtail->list = np;
714 	else {
715 		/* restart timer */
716 		r->timeout = 0;
717 		r->xmits = 1;
718 		r->unacked = np;
719 	}
720 	r->unackedtail = np;
721 	np->list = nil;
722 }
723 
724 /*
725  *  retransmit unacked blocks
726  */
727 void
relackproc(void * a)728 relackproc(void *a)
729 {
730 	Rudpcb *ucb;
731 	Proto *rudp;
732 	Reliable *r;
733 	Conv **s, *c;
734 
735 	rudp = (Proto *)a;
736 
737 loop:
738 	tsleep(&up->sleep, return0, 0, Rudptickms);
739 
740 	for(s = rudp->conv; *s; s++) {
741 		c = *s;
742 		ucb = (Rudpcb*)c->ptcl;
743 		qlock(ucb);
744 
745 		for(r = ucb->r; r; r = r->next) {
746 			if(r->unacked != nil){
747 				r->timeout += Rudptickms;
748 				if(r->timeout > Rudprxms*r->xmits)
749 					relrexmit(c, r);
750 			}
751 			if(r->acksent != r->rcvseq)
752 				relsendack(c, r, 0);
753 		}
754 		qunlock(ucb);
755 	}
756 	goto loop;
757 }
758 
759 /*
760  *  get the state record for a conversation
761  */
762 Reliable*
relstate(Rudpcb * ucb,uchar * addr,ushort port,char * from)763 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
764 {
765 	Reliable *r, **l;
766 
767 	l = &ucb->r;
768 	for(r = *l; r; r = *l){
769 		if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
770 		    port == r->port)
771 			break;
772 		l = &r->next;
773 	}
774 
775 	/* no state for this addr/port, create some */
776 	if(r == nil){
777 		while(generation == 0)
778 			generation = rand();
779 
780 		DPRINT("from %s new state %lud for %I!%ud\n",
781 		        from, generation, addr, port);
782 
783 		r = smalloc(sizeof(Reliable));
784 		memmove(r->addr, addr, IPaddrlen);
785 		r->port = port;
786 		r->unacked = 0;
787 		if(generation == Hangupgen)
788 			generation++;
789 		r->sndgen = generation++;
790 		r->sndseq = 0;
791 		r->ackrcvd = 0;
792 		r->rcvgen = 0;
793 		r->rcvseq = 0;
794 		r->acksent = 0;
795 		r->xmits = 0;
796 		r->timeout = 0;
797 		r->ref = 0;
798 		incref(r);	/* one reference for being in the list */
799 
800 		*l = r;
801 	}
802 
803 	incref(r);
804 	return r;
805 }
806 
807 void
relput(Reliable * r)808 relput(Reliable *r)
809 {
810 	if(decref(r) == 0)
811 		free(r);
812 }
813 
814 /*
815  *  forget a Reliable state
816  */
817 void
relforget(Conv * c,uchar * ip,int port,int originator)818 relforget(Conv *c, uchar *ip, int port, int originator)
819 {
820 	Rudpcb *ucb;
821 	Reliable *r, **l;
822 
823 	ucb = (Rudpcb*)c->ptcl;
824 
825 	l = &ucb->r;
826 	for(r = *l; r; r = *l){
827 		if(ipcmp(ip, r->addr) == 0 && port == r->port){
828 			*l = r->next;
829 			if(originator)
830 				relsendack(c, r, 1);
831 			relhangup(c, r);
832 			relput(r);	/* remove from the list */
833 			break;
834 		}
835 		l = &r->next;
836 	}
837 }
838 
839 /*
840  *  process a rcvd reliable packet. return -1 if not to be passed to user process,
841  *  0 therwise.
842  *
843  *  called with ucb locked.
844  */
845 int
reliput(Conv * c,Block * bp,uchar * addr,ushort port)846 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
847 {
848 	Block *nbp;
849 	Rudpcb *ucb;
850 	Rudppriv *upriv;
851 	Udphdr *uh;
852 	Reliable *r;
853 	Rudphdr *rh;
854 	ulong seq, ack, sgen, agen, ackreal;
855 	int rv = -1;
856 
857 	/* get fields */
858 	uh = (Udphdr*)(bp->rp);
859 	rh = (Rudphdr*)uh;
860 	seq = nhgetl(rh->relseq);
861 	sgen = nhgetl(rh->relsgen);
862 	ack = nhgetl(rh->relack);
863 	agen = nhgetl(rh->relagen);
864 
865 	upriv = c->p->priv;
866 	ucb = (Rudpcb*)c->ptcl;
867 	r = relstate(ucb, addr, port, "input");
868 
869 	DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
870 		seq, sgen, ack, agen, r->sndgen);
871 
872 	/* if acking an incorrect generation, ignore */
873 	if(ack && agen != r->sndgen)
874 		goto out;
875 
876 	/* Look for a hangup */
877 	if(sgen == Hangupgen) {
878 		if(agen == r->sndgen)
879 			relforget(c, addr, port, 0);
880 		goto out;
881 	}
882 
883 	/* make sure we're not talking to a new remote side */
884 	if(r->rcvgen != sgen){
885 		if(seq != 0 && seq != 1)
886 			goto out;
887 
888 		/* new connection */
889 		if(r->rcvgen != 0){
890 			DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
891 			relhangup(c, r);
892 		}
893 		r->rcvgen = sgen;
894 	}
895 
896 	/* dequeue acked packets */
897 	if(ack && agen == r->sndgen){
898 		ackreal = 0;
899 		while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
900 			nbp = r->unacked;
901 			r->unacked = nbp->list;
902 			DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
903 			       ack, agen, r->sndgen);
904 			freeb(nbp);
905 			r->ackrcvd = NEXTSEQ(r->ackrcvd);
906 			ackreal = 1;
907 		}
908 
909 		/* flow control */
910 		if(UNACKED(r) < Maxunacked/8 && r->blocked)
911 			wakeup(&r->vous);
912 
913 		/*
914 		 *  retransmit next packet if the acked packet
915 		 *  was transmitted more than once
916 		 */
917 		if(ackreal && r->unacked != nil){
918 			r->timeout = 0;
919 			if(r->xmits > 1){
920 				r->xmits = 1;
921 				relrexmit(c, r);
922 			}
923 		}
924 
925 	}
926 
927 	/* no message or input queue full */
928 	if(seq == 0 || qfull(c->rq))
929 		goto out;
930 
931 	/* refuse out of order delivery */
932 	if(seq != NEXTSEQ(r->rcvseq)){
933 		relsendack(c, r, 0);	/* tell him we got it already */
934 		upriv->orders++;
935 		DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
936 		goto out;
937 	}
938 	r->rcvseq = seq;
939 
940 	rv = 0;
941 out:
942 	relput(r);
943 	return rv;
944 }
945 
946 void
relsendack(Conv * c,Reliable * r,int hangup)947 relsendack(Conv *c, Reliable *r, int hangup)
948 {
949 	Udphdr *uh;
950 	Block *bp;
951 	Rudphdr *rh;
952 	int ptcllen;
953 	Fs *f;
954 
955 	bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
956 	if(bp == nil)
957 		return;
958 	bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
959 	f = c->p->f;
960 	uh = (Udphdr *)(bp->rp);
961 	uh->vihl = IP_VER4;
962 	rh = (Rudphdr*)uh;
963 
964 	ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
965 	uh->Unused = 0;
966 	uh->udpproto = IP_UDPPROTO;
967 	uh->frag[0] = 0;
968 	uh->frag[1] = 0;
969 	hnputs(uh->udpplen, ptcllen);
970 
971 	v6tov4(uh->udpdst, r->addr);
972 	hnputs(uh->udpdport, r->port);
973 	hnputs(uh->udpsport, c->lport);
974 	if(ipcmp(c->laddr, IPnoaddr) == 0)
975 		findlocalip(f, c->laddr, c->raddr);
976 	v6tov4(uh->udpsrc, c->laddr);
977 	hnputs(uh->udplen, ptcllen);
978 
979 	if(hangup)
980 		hnputl(rh->relsgen, Hangupgen);
981 	else
982 		hnputl(rh->relsgen, r->sndgen);
983 	hnputl(rh->relseq, 0);
984 	hnputl(rh->relagen, r->rcvgen);
985 	hnputl(rh->relack, r->rcvseq);
986 
987 	if(r->acksent < r->rcvseq)
988 		r->acksent = r->rcvseq;
989 
990 	uh->udpcksum[0] = 0;
991 	uh->udpcksum[1] = 0;
992 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
993 
994 	DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
995 	doipoput(c, f, bp, 0, c->ttl, c->tos);
996 }
997 
998 
999 /*
1000  *  called with ucb locked (and c locked if user initiated close)
1001  */
1002 void
relhangup(Conv * c,Reliable * r)1003 relhangup(Conv *c, Reliable *r)
1004 {
1005 	int n;
1006 	Block *bp;
1007 	char hup[ERRMAX];
1008 
1009 	n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1010 	qproduce(c->eq, hup, n);
1011 
1012 	/*
1013 	 *  dump any unacked outgoing messages
1014 	 */
1015 	for(bp = r->unacked; bp != nil; bp = r->unacked){
1016 		r->unacked = bp->list;
1017 		bp->list = nil;
1018 		freeb(bp);
1019 	}
1020 
1021 	r->rcvgen = 0;
1022 	r->rcvseq = 0;
1023 	r->acksent = 0;
1024 	if(generation == Hangupgen)
1025 		generation++;
1026 	r->sndgen = generation++;
1027 	r->sndseq = 0;
1028 	r->ackrcvd = 0;
1029 	r->xmits = 0;
1030 	r->timeout = 0;
1031 	wakeup(&r->vous);
1032 }
1033 
1034 /*
1035  *  called with ucb locked
1036  */
1037 void
relrexmit(Conv * c,Reliable * r)1038 relrexmit(Conv *c, Reliable *r)
1039 {
1040 	Rudppriv *upriv;
1041 	Block *np;
1042 	Fs *f;
1043 
1044 	upriv = c->p->priv;
1045 	f = c->p->f;
1046 	r->timeout = 0;
1047 	if(r->xmits++ > Rudpmaxxmit){
1048 		relhangup(c, r);
1049 		return;
1050 	}
1051 
1052 	upriv->rxmits++;
1053 	np = copyblock(r->unacked, blocklen(r->unacked));
1054 	DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1055 	doipoput(c, f, np, 0, c->ttl, c->tos);
1056 }
1057