xref: /plan9-contrib/sys/src/9/ip/rudp.c (revision 25fc69938fdecc61cd09e795cbe2d2f72f1082b1)
1 /*
2  *  Reliable User Datagram Protocol, currently only for IPv4.
3  *  This protocol is compatible with UDP's packet format.
4  *  It could be done over UDP if need be.
5  */
6 #include	"u.h"
7 #include	"../port/lib.h"
8 #include	"mem.h"
9 #include	"dat.h"
10 #include	"fns.h"
11 #include	"../port/error.h"
12 
13 #include	"ip.h"
14 
15 #define DEBUG	0
16 #define DPRINT if(DEBUG)print
17 
18 #define SEQDIFF(a,b) ( (a)>=(b)?\
19 			(a)-(b):\
20 			0xffffffffUL-((b)-(a)) )
21 #define INSEQ(a,start,end) ( (start)<=(end)?\
22 				((a)>(start)&&(a)<=(end)):\
23 				((a)>(start)||(a)<=(end)) )
24 #define UNACKED(r) SEQDIFF(r->sndseq, r->ackrcvd)
25 #define NEXTSEQ(a) ( (a)+1 == 0 ? 1 : (a)+1 )
26 
27 enum
28 {
29 	UDP_PHDRSIZE	= 12,	/* pseudo header */
30 //	UDP_HDRSIZE	= 20,	/* pseudo header + udp header */
31 	UDP_RHDRSIZE	= 36,	/* pseudo header + udp header + rudp header */
32 	UDP_IPHDR	= 8,	/* ip header */
33 	IP_UDPPROTO	= 254,
34 	UDP_USEAD7	= 52,	/* size of new ipv6 headers struct */
35 
36 	Rudprxms	= 200,
37 	Rudptickms	= 50,
38 	Rudpmaxxmit	= 10,
39 	Maxunacked	= 100,
40 };
41 
42 #define Hangupgen	0xffffffff	/* used only in hangup messages */
43 
44 typedef struct Udphdr Udphdr;
45 struct Udphdr
46 {
47 	/* ip header */
48 	uchar	vihl;		/* Version and header length */
49 	uchar	tos;		/* Type of service */
50 	uchar	length[2];	/* packet length */
51 	uchar	id[2];		/* Identification */
52 	uchar	frag[2];	/* Fragment information */
53 
54 	/* pseudo header starts here */
55 	uchar	Unused;
56 	uchar	udpproto;	/* Protocol */
57 	uchar	udpplen[2];	/* Header plus data length */
58 	uchar	udpsrc[4];	/* Ip source */
59 	uchar	udpdst[4];	/* Ip destination */
60 
61 	/* udp header */
62 	uchar	udpsport[2];	/* Source port */
63 	uchar	udpdport[2];	/* Destination port */
64 	uchar	udplen[2];	/* data length */
65 	uchar	udpcksum[2];	/* Checksum */
66 };
67 
68 typedef struct Rudphdr Rudphdr;
69 struct Rudphdr
70 {
71 	/* ip header */
72 	uchar	vihl;		/* Version and header length */
73 	uchar	tos;		/* Type of service */
74 	uchar	length[2];	/* packet length */
75 	uchar	id[2];		/* Identification */
76 	uchar	frag[2];	/* Fragment information */
77 
78 	/* pseudo header starts here */
79 	uchar	Unused;
80 	uchar	udpproto;	/* Protocol */
81 	uchar	udpplen[2];	/* Header plus data length */
82 	uchar	udpsrc[4];	/* Ip source */
83 	uchar	udpdst[4];	/* Ip destination */
84 
85 	/* udp header */
86 	uchar	udpsport[2];	/* Source port */
87 	uchar	udpdport[2];	/* Destination port */
88 	uchar	udplen[2];	/* data length (includes rudp header) */
89 	uchar	udpcksum[2];	/* Checksum */
90 
91 	/* rudp header */
92 	uchar	relseq[4];	/* id of this packet (or 0) */
93 	uchar	relsgen[4];	/* generation/time stamp */
94 	uchar	relack[4];	/* packet being acked (or 0) */
95 	uchar	relagen[4];	/* generation/time stamp */
96 };
97 
98 
99 /*
100  *  one state structure per destination
101  */
102 typedef struct Reliable Reliable;
103 struct Reliable
104 {
105 	Ref;
106 
107 	Reliable *next;
108 
109 	uchar	addr[IPaddrlen];	/* always V6 when put here */
110 	ushort	port;
111 
112 	Block	*unacked;	/* unacked msg list */
113 	Block	*unackedtail;	/*  and its tail */
114 
115 	int	timeout;	/* time since first unacked msg sent */
116 	int	xmits;		/* number of times first unacked msg sent */
117 
118 	ulong	sndseq;		/* next packet to be sent */
119 	ulong	sndgen;		/*  and its generation */
120 
121 	ulong	rcvseq;		/* last packet received */
122 	ulong	rcvgen;		/*  and its generation */
123 
124 	ulong	acksent;	/* last ack sent */
125 	ulong	ackrcvd;	/* last msg for which ack was rcvd */
126 
127 	/* flow control */
128 	QLock	lock;
129 	Rendez	vous;
130 	int	blocked;
131 };
132 
133 
134 
135 /* MIB II counters */
136 typedef struct Rudpstats Rudpstats;
137 struct Rudpstats
138 {
139 	ulong	rudpInDatagrams;
140 	ulong	rudpNoPorts;
141 	ulong	rudpInErrors;
142 	ulong	rudpOutDatagrams;
143 };
144 
145 typedef struct Rudppriv Rudppriv;
146 struct Rudppriv
147 {
148 	Ipht	ht;
149 
150 	/* MIB counters */
151 	Rudpstats	ustats;
152 
153 	/* non-MIB stats */
154 	ulong	csumerr;		/* checksum errors */
155 	ulong	lenerr;			/* short packet */
156 	ulong	rxmits;			/* # of retransmissions */
157 	ulong	orders;			/* # of out of order pkts */
158 
159 	/* keeping track of the ack kproc */
160 	int	ackprocstarted;
161 	QLock	apl;
162 };
163 
164 
165 static ulong generation = 0;
166 static Rendez rend;
167 
168 /*
169  *  protocol specific part of Conv
170  */
171 typedef struct Rudpcb Rudpcb;
172 struct Rudpcb
173 {
174 	QLock;
175 	uchar	headers;
176 	uchar	randdrop;
177 	Reliable *r;
178 };
179 
180 /*
181  * local functions
182  */
183 void	relsendack(Conv*, Reliable*, int);
184 int	reliput(Conv*, Block*, uchar*, ushort);
185 Reliable *relstate(Rudpcb*, uchar*, ushort, char*);
186 void	relput(Reliable*);
187 void	relforget(Conv *, uchar*, int, int);
188 void	relackproc(void *);
189 void	relackq(Reliable *, Block*);
190 void	relhangup(Conv *, Reliable*);
191 void	relrexmit(Conv *, Reliable*);
192 void	relput(Reliable*);
193 void	rudpkick(void *x);
194 
195 static void
196 rudpstartackproc(Proto *rudp)
197 {
198 	Rudppriv *rpriv;
199 	char kpname[KNAMELEN];
200 
201 	rpriv = rudp->priv;
202 	if(rpriv->ackprocstarted == 0){
203 		qlock(&rpriv->apl);
204 		if(rpriv->ackprocstarted == 0){
205 			sprint(kpname, "#I%drudpack", rudp->f->dev);
206 			kproc(kpname, relackproc, rudp);
207 			rpriv->ackprocstarted = 1;
208 		}
209 		qunlock(&rpriv->apl);
210 	}
211 }
212 
213 static char*
214 rudpconnect(Conv *c, char **argv, int argc)
215 {
216 	char *e;
217 	Rudppriv *upriv;
218 
219 	upriv = c->p->priv;
220 	rudpstartackproc(c->p);
221 	e = Fsstdconnect(c, argv, argc);
222 	Fsconnected(c, e);
223 	iphtadd(&upriv->ht, c);
224 
225 	return e;
226 }
227 
228 
229 static int
230 rudpstate(Conv *c, char *state, int n)
231 {
232 	Rudpcb *ucb;
233 	Reliable *r;
234 	int m;
235 
236 	m = snprint(state, n, "%s", c->inuse?"Open":"Closed");
237 	ucb = (Rudpcb*)c->ptcl;
238 	qlock(ucb);
239 	for(r = ucb->r; r; r = r->next)
240 		m += snprint(state+m, n-m, " %I/%ld", r->addr, UNACKED(r));
241 	m += snprint(state+m, n-m, "\n");
242 	qunlock(ucb);
243 	return m;
244 }
245 
246 static char*
247 rudpannounce(Conv *c, char** argv, int argc)
248 {
249 	char *e;
250 	Rudppriv *upriv;
251 
252 	upriv = c->p->priv;
253 	rudpstartackproc(c->p);
254 	e = Fsstdannounce(c, argv, argc);
255 	if(e != nil)
256 		return e;
257 	Fsconnected(c, nil);
258 	iphtadd(&upriv->ht, c);
259 
260 	return nil;
261 }
262 
263 static void
264 rudpcreate(Conv *c)
265 {
266 	c->rq = qopen(64*1024, Qmsg, 0, 0);
267 	c->wq = qopen(64*1024, Qkick, rudpkick, c);
268 }
269 
270 static void
271 rudpclose(Conv *c)
272 {
273 	Rudpcb *ucb;
274 	Reliable *r, *nr;
275 	Rudppriv *upriv;
276 
277 	upriv = c->p->priv;
278 	iphtrem(&upriv->ht, c);
279 
280 	/* force out any delayed acks */
281 	ucb = (Rudpcb*)c->ptcl;
282 	qlock(ucb);
283 	for(r = ucb->r; r; r = r->next){
284 		if(r->acksent != r->rcvseq)
285 			relsendack(c, r, 0);
286 	}
287 	qunlock(ucb);
288 
289 	qclose(c->rq);
290 	qclose(c->wq);
291 	qclose(c->eq);
292 	ipmove(c->laddr, IPnoaddr);
293 	ipmove(c->raddr, IPnoaddr);
294 	c->lport = 0;
295 	c->rport = 0;
296 
297 	ucb->headers = 0;
298 	ucb->randdrop = 0;
299 	qlock(ucb);
300 	for(r = ucb->r; r; r = nr){
301 		if(r->acksent != r->rcvseq)
302 			relsendack(c, r, 0);
303 		nr = r->next;
304 		relhangup(c, r);
305 		relput(r);
306 	}
307 	ucb->r = 0;
308 
309 	qunlock(ucb);
310 }
311 
312 /*
313  *  randomly don't send packets
314  */
315 static void
316 doipoput(Conv *c, Fs *f, Block *bp, int x, int ttl, int tos)
317 {
318 	Rudpcb *ucb;
319 
320 	ucb = (Rudpcb*)c->ptcl;
321 	if(ucb->randdrop && nrand(100) < ucb->randdrop)
322 		freeblist(bp);
323 	else
324 		ipoput4(f, bp, x, ttl, tos, nil);
325 }
326 
327 int
328 flow(void *v)
329 {
330 	Reliable *r = v;
331 
332 	return UNACKED(r) <= Maxunacked;
333 }
334 
335 void
336 rudpkick(void *x)
337 {
338 	Conv *c = x;
339 	Udphdr *uh;
340 	ushort rport;
341 	uchar laddr[IPaddrlen], raddr[IPaddrlen];
342 	Block *bp;
343 	Rudpcb *ucb;
344 	Rudphdr *rh;
345 	Reliable *r;
346 	int dlen, ptcllen;
347 	Rudppriv *upriv;
348 	Fs *f;
349 
350 	upriv = c->p->priv;
351 	f = c->p->f;
352 
353 	netlog(c->p->f, Logrudp, "rudp: kick\n");
354 	bp = qget(c->wq);
355 	if(bp == nil)
356 		return;
357 
358 	ucb = (Rudpcb*)c->ptcl;
359 	switch(ucb->headers) {
360 	case 7:
361 		/* get user specified addresses */
362 		bp = pullupblock(bp, UDP_USEAD7);
363 		if(bp == nil)
364 			return;
365 		ipmove(raddr, bp->rp);
366 		bp->rp += IPaddrlen;
367 		ipmove(laddr, bp->rp);
368 		bp->rp += IPaddrlen;
369 		/* pick interface closest to dest */
370 		if(ipforme(f, laddr) != Runi)
371 			findlocalip(f, laddr, raddr);
372 		bp->rp += IPaddrlen;		/* Ignore ifc address */
373 		rport = nhgets(bp->rp);
374 		bp->rp += 2+2;			/* Ignore local port */
375 		break;
376 	default:
377 		ipmove(raddr, c->raddr);
378 		ipmove(laddr, c->laddr);
379 		rport = c->rport;
380 		break;
381 	}
382 
383 	dlen = blocklen(bp);
384 
385 	/* Make space to fit rudp & ip header */
386 	bp = padblock(bp, UDP_IPHDR+UDP_RHDRSIZE);
387 	if(bp == nil)
388 		return;
389 
390 	uh = (Udphdr *)(bp->rp);
391 	uh->vihl = IP_VER4;
392 
393 	rh = (Rudphdr*)uh;
394 
395 	ptcllen = dlen + (UDP_RHDRSIZE-UDP_PHDRSIZE);
396 	uh->Unused = 0;
397 	uh->udpproto = IP_UDPPROTO;
398 	uh->frag[0] = 0;
399 	uh->frag[1] = 0;
400 	hnputs(uh->udpplen, ptcllen);
401 	switch(ucb->headers){
402 	case 7:
403 		v6tov4(uh->udpdst, raddr);
404 		hnputs(uh->udpdport, rport);
405 		v6tov4(uh->udpsrc, laddr);
406 		break;
407 	default:
408 		v6tov4(uh->udpdst, c->raddr);
409 		hnputs(uh->udpdport, c->rport);
410 		if(ipcmp(c->laddr, IPnoaddr) == 0)
411 			findlocalip(f, c->laddr, c->raddr);
412 		v6tov4(uh->udpsrc, c->laddr);
413 		break;
414 	}
415 	hnputs(uh->udpsport, c->lport);
416 	hnputs(uh->udplen, ptcllen);
417 	uh->udpcksum[0] = 0;
418 	uh->udpcksum[1] = 0;
419 
420 	qlock(ucb);
421 	r = relstate(ucb, raddr, rport, "kick");
422 	r->sndseq = NEXTSEQ(r->sndseq);
423 	hnputl(rh->relseq, r->sndseq);
424 	hnputl(rh->relsgen, r->sndgen);
425 
426 	hnputl(rh->relack, r->rcvseq);  /* ACK last rcvd packet */
427 	hnputl(rh->relagen, r->rcvgen);
428 
429 	if(r->rcvseq != r->acksent)
430 		r->acksent = r->rcvseq;
431 
432 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, dlen+UDP_RHDRSIZE));
433 
434 	relackq(r, bp);
435 	qunlock(ucb);
436 
437 	upriv->ustats.rudpOutDatagrams++;
438 
439 	DPRINT("sent: %lud/%lud, %lud/%lud\n",
440 		r->sndseq, r->sndgen, r->rcvseq, r->rcvgen);
441 
442 	doipoput(c, f, bp, 0, c->ttl, c->tos);
443 
444 	if(waserror()) {
445 		relput(r);
446 		qunlock(&r->lock);
447 		nexterror();
448 	}
449 
450 	/* flow control of sorts */
451 	qlock(&r->lock);
452 	if(UNACKED(r) > Maxunacked){
453 		r->blocked = 1;
454 		sleep(&r->vous, flow, r);
455 		r->blocked = 0;
456 	}
457 
458 	qunlock(&r->lock);
459 	relput(r);
460 	poperror();
461 }
462 
463 void
464 rudpiput(Proto *rudp, Ipifc *ifc, Block *bp)
465 {
466 	int len, olen, ottl;
467 	Udphdr *uh;
468 	Conv *c;
469 	Rudpcb *ucb;
470 	uchar raddr[IPaddrlen], laddr[IPaddrlen];
471 	ushort rport, lport;
472 	Rudppriv *upriv;
473 	Fs *f;
474 	uchar *p;
475 
476 	upriv = rudp->priv;
477 	f = rudp->f;
478 
479 	upriv->ustats.rudpInDatagrams++;
480 
481 	uh = (Udphdr*)(bp->rp);
482 
483 	/* Put back pseudo header for checksum
484 	 * (remember old values for icmpnoconv())
485 	 */
486 	ottl = uh->Unused;
487 	uh->Unused = 0;
488 	len = nhgets(uh->udplen);
489 	olen = nhgets(uh->udpplen);
490 	hnputs(uh->udpplen, len);
491 
492 	v4tov6(raddr, uh->udpsrc);
493 	v4tov6(laddr, uh->udpdst);
494 	lport = nhgets(uh->udpdport);
495 	rport = nhgets(uh->udpsport);
496 
497 	if(nhgets(uh->udpcksum)) {
498 		if(ptclcsum(bp, UDP_IPHDR, len+UDP_PHDRSIZE)) {
499 			upriv->ustats.rudpInErrors++;
500 			upriv->csumerr++;
501 			netlog(f, Logrudp, "rudp: checksum error %I\n", raddr);
502 			DPRINT("rudp: checksum error %I\n", raddr);
503 			freeblist(bp);
504 			return;
505 		}
506 	}
507 
508 	qlock(rudp);
509 
510 	c = iphtlook(&upriv->ht, raddr, rport, laddr, lport);
511 	if(c == nil){
512 		/* no conversation found */
513 		upriv->ustats.rudpNoPorts++;
514 		qunlock(rudp);
515 		netlog(f, Logudp, "udp: no conv %I!%d -> %I!%d\n", raddr, rport,
516 			laddr, lport);
517 		uh->Unused = ottl;
518 		hnputs(uh->udpplen, olen);
519 		icmpnoconv(f, bp);
520 		freeblist(bp);
521 		return;
522 	}
523 	ucb = (Rudpcb*)c->ptcl;
524 	qlock(ucb);
525 	qunlock(rudp);
526 
527 	if(reliput(c, bp, raddr, rport) < 0){
528 		qunlock(ucb);
529 		freeb(bp);
530 		return;
531 	}
532 
533 	/*
534 	 * Trim the packet down to data size
535 	 */
536 
537 	len -= (UDP_RHDRSIZE-UDP_PHDRSIZE);
538 	bp = trimblock(bp, UDP_IPHDR+UDP_RHDRSIZE, len);
539 	if(bp == nil) {
540 		netlog(f, Logrudp, "rudp: len err %I.%d -> %I.%d\n",
541 			raddr, rport, laddr, lport);
542 		DPRINT("rudp: len err %I.%d -> %I.%d\n",
543 			raddr, rport, laddr, lport);
544 		upriv->lenerr++;
545 		return;
546 	}
547 
548 	netlog(f, Logrudpmsg, "rudp: %I.%d -> %I.%d l %d\n",
549 		raddr, rport, laddr, lport, len);
550 
551 	switch(ucb->headers){
552 	case 7:
553 		/* pass the src address */
554 		bp = padblock(bp, UDP_USEAD7);
555 		p = bp->rp;
556 		ipmove(p, raddr); p += IPaddrlen;
557 		ipmove(p, laddr); p += IPaddrlen;
558 		ipmove(p, ifc->lifc->local); p += IPaddrlen;
559 		hnputs(p, rport); p += 2;
560 		hnputs(p, lport);
561 		break;
562 	default:
563 		/* connection oriented rudp */
564 		if(ipcmp(c->raddr, IPnoaddr) == 0){
565 			/* save the src address in the conversation */
566 		 	ipmove(c->raddr, raddr);
567 			c->rport = rport;
568 
569 			/* reply with the same ip address (if not broadcast) */
570 			if(ipforme(f, laddr) == Runi)
571 				ipmove(c->laddr, laddr);
572 			else
573 				v4tov6(c->laddr, ifc->lifc->local);
574 		}
575 		break;
576 	}
577 	if(bp->next)
578 		bp = concatblock(bp);
579 
580 	if(qfull(c->rq)) {
581 		netlog(f, Logrudp, "rudp: qfull %I.%d -> %I.%d\n", raddr, rport,
582 			laddr, lport);
583 		freeblist(bp);
584 	}
585 	else
586 		qpass(c->rq, bp);
587 
588 	qunlock(ucb);
589 }
590 
591 static char *rudpunknown = "unknown rudp ctl request";
592 
593 char*
594 rudpctl(Conv *c, char **f, int n)
595 {
596 	Rudpcb *ucb;
597 	uchar ip[IPaddrlen];
598 	int x;
599 
600 	ucb = (Rudpcb*)c->ptcl;
601 	if(n < 1)
602 		return rudpunknown;
603 
604 	if(strcmp(f[0], "headers") == 0){
605 		ucb->headers = 7;		/* new headers format */
606 		return nil;
607 	} else if(strcmp(f[0], "hangup") == 0){
608 		if(n < 3)
609 			return "bad syntax";
610 		if (parseip(ip, f[1]) == -1)
611 			return Ebadip;
612 		x = atoi(f[2]);
613 		qlock(ucb);
614 		relforget(c, ip, x, 1);
615 		qunlock(ucb);
616 		return nil;
617 	} else if(strcmp(f[0], "randdrop") == 0){
618 		x = 10;			/* default is 10% */
619 		if(n > 1)
620 			x = atoi(f[1]);
621 		if(x > 100 || x < 0)
622 			return "illegal rudp drop rate";
623 		ucb->randdrop = x;
624 		return nil;
625 	}
626 	return rudpunknown;
627 }
628 
629 void
630 rudpadvise(Proto *rudp, Block *bp, char *msg)
631 {
632 	Udphdr *h;
633 	uchar source[IPaddrlen], dest[IPaddrlen];
634 	ushort psource, pdest;
635 	Conv *s, **p;
636 
637 	h = (Udphdr*)(bp->rp);
638 
639 	v4tov6(dest, h->udpdst);
640 	v4tov6(source, h->udpsrc);
641 	psource = nhgets(h->udpsport);
642 	pdest = nhgets(h->udpdport);
643 
644 	/* Look for a connection */
645 	for(p = rudp->conv; *p; p++) {
646 		s = *p;
647 		if(s->rport == pdest)
648 		if(s->lport == psource)
649 		if(ipcmp(s->raddr, dest) == 0)
650 		if(ipcmp(s->laddr, source) == 0){
651 			qhangup(s->rq, msg);
652 			qhangup(s->wq, msg);
653 			break;
654 		}
655 	}
656 	freeblist(bp);
657 }
658 
659 int
660 rudpstats(Proto *rudp, char *buf, int len)
661 {
662 	Rudppriv *upriv;
663 
664 	upriv = rudp->priv;
665 	return snprint(buf, len, "%lud %lud %lud %lud %lud %lud\n",
666 		upriv->ustats.rudpInDatagrams,
667 		upriv->ustats.rudpNoPorts,
668 		upriv->ustats.rudpInErrors,
669 		upriv->ustats.rudpOutDatagrams,
670 		upriv->rxmits,
671 		upriv->orders);
672 }
673 
674 void
675 rudpinit(Fs *fs)
676 {
677 
678 	Proto *rudp;
679 
680 	rudp = smalloc(sizeof(Proto));
681 	rudp->priv = smalloc(sizeof(Rudppriv));
682 	rudp->name = "rudp";
683 	rudp->connect = rudpconnect;
684 	rudp->announce = rudpannounce;
685 	rudp->ctl = rudpctl;
686 	rudp->state = rudpstate;
687 	rudp->create = rudpcreate;
688 	rudp->close = rudpclose;
689 	rudp->rcv = rudpiput;
690 	rudp->advise = rudpadvise;
691 	rudp->stats = rudpstats;
692 	rudp->ipproto = IP_UDPPROTO;
693 	rudp->nc = 32;
694 	rudp->ptclsize = sizeof(Rudpcb);
695 
696 	Fsproto(fs, rudp);
697 }
698 
699 /*********************************************/
700 /* Here starts the reliable helper functions */
701 /*********************************************/
702 /*
703  *  Enqueue a copy of an unacked block for possible retransmissions
704  */
705 void
706 relackq(Reliable *r, Block *bp)
707 {
708 	Block *np;
709 
710 	np = copyblock(bp, blocklen(bp));
711 	if(r->unacked)
712 		r->unackedtail->list = np;
713 	else {
714 		/* restart timer */
715 		r->timeout = 0;
716 		r->xmits = 1;
717 		r->unacked = np;
718 	}
719 	r->unackedtail = np;
720 	np->list = nil;
721 }
722 
723 /*
724  *  retransmit unacked blocks
725  */
726 void
727 relackproc(void *a)
728 {
729 	Rudpcb *ucb;
730 	Proto *rudp;
731 	Reliable *r;
732 	Conv **s, *c;
733 
734 	rudp = (Proto *)a;
735 
736 loop:
737 	tsleep(&up->sleep, return0, 0, Rudptickms);
738 
739 	for(s = rudp->conv; *s; s++) {
740 		c = *s;
741 		ucb = (Rudpcb*)c->ptcl;
742 		qlock(ucb);
743 
744 		for(r = ucb->r; r; r = r->next) {
745 			if(r->unacked != nil){
746 				r->timeout += Rudptickms;
747 				if(r->timeout > Rudprxms*r->xmits)
748 					relrexmit(c, r);
749 			}
750 			if(r->acksent != r->rcvseq)
751 				relsendack(c, r, 0);
752 		}
753 		qunlock(ucb);
754 	}
755 	goto loop;
756 }
757 
758 /*
759  *  get the state record for a conversation
760  */
761 Reliable*
762 relstate(Rudpcb *ucb, uchar *addr, ushort port, char *from)
763 {
764 	Reliable *r, **l;
765 
766 	l = &ucb->r;
767 	for(r = *l; r; r = *l){
768 		if(memcmp(addr, r->addr, IPaddrlen) == 0 &&
769 		    port == r->port)
770 			break;
771 		l = &r->next;
772 	}
773 
774 	/* no state for this addr/port, create some */
775 	if(r == nil){
776 		while(generation == 0)
777 			generation = rand();
778 
779 		DPRINT("from %s new state %lud for %I!%ud\n",
780 		        from, generation, addr, port);
781 
782 		r = smalloc(sizeof(Reliable));
783 		memmove(r->addr, addr, IPaddrlen);
784 		r->port = port;
785 		r->unacked = 0;
786 		if(generation == Hangupgen)
787 			generation++;
788 		r->sndgen = generation++;
789 		r->sndseq = 0;
790 		r->ackrcvd = 0;
791 		r->rcvgen = 0;
792 		r->rcvseq = 0;
793 		r->acksent = 0;
794 		r->xmits = 0;
795 		r->timeout = 0;
796 		r->ref = 0;
797 		incref(r);	/* one reference for being in the list */
798 
799 		*l = r;
800 	}
801 
802 	incref(r);
803 	return r;
804 }
805 
806 void
807 relput(Reliable *r)
808 {
809 	if(decref(r) == 0)
810 		free(r);
811 }
812 
813 /*
814  *  forget a Reliable state
815  */
816 void
817 relforget(Conv *c, uchar *ip, int port, int originator)
818 {
819 	Rudpcb *ucb;
820 	Reliable *r, **l;
821 
822 	ucb = (Rudpcb*)c->ptcl;
823 
824 	l = &ucb->r;
825 	for(r = *l; r; r = *l){
826 		if(ipcmp(ip, r->addr) == 0 && port == r->port){
827 			*l = r->next;
828 			if(originator)
829 				relsendack(c, r, 1);
830 			relhangup(c, r);
831 			relput(r);	/* remove from the list */
832 			break;
833 		}
834 		l = &r->next;
835 	}
836 }
837 
838 /*
839  *  process a rcvd reliable packet. return -1 if not to be passed to user process,
840  *  0 therwise.
841  *
842  *  called with ucb locked.
843  */
844 int
845 reliput(Conv *c, Block *bp, uchar *addr, ushort port)
846 {
847 	Block *nbp;
848 	Rudpcb *ucb;
849 	Rudppriv *upriv;
850 	Udphdr *uh;
851 	Reliable *r;
852 	Rudphdr *rh;
853 	ulong seq, ack, sgen, agen, ackreal;
854 	int rv = -1;
855 
856 	/* get fields */
857 	uh = (Udphdr*)(bp->rp);
858 	rh = (Rudphdr*)uh;
859 	seq = nhgetl(rh->relseq);
860 	sgen = nhgetl(rh->relsgen);
861 	ack = nhgetl(rh->relack);
862 	agen = nhgetl(rh->relagen);
863 
864 	upriv = c->p->priv;
865 	ucb = (Rudpcb*)c->ptcl;
866 	r = relstate(ucb, addr, port, "input");
867 
868 	DPRINT("rcvd %lud/%lud, %lud/%lud, r->sndgen = %lud\n",
869 		seq, sgen, ack, agen, r->sndgen);
870 
871 	/* if acking an incorrect generation, ignore */
872 	if(ack && agen != r->sndgen)
873 		goto out;
874 
875 	/* Look for a hangup */
876 	if(sgen == Hangupgen) {
877 		if(agen == r->sndgen)
878 			relforget(c, addr, port, 0);
879 		goto out;
880 	}
881 
882 	/* make sure we're not talking to a new remote side */
883 	if(r->rcvgen != sgen){
884 		if(seq != 0 && seq != 1)
885 			goto out;
886 
887 		/* new connection */
888 		if(r->rcvgen != 0){
889 			DPRINT("new con r->rcvgen = %lud, sgen = %lud\n", r->rcvgen, sgen);
890 			relhangup(c, r);
891 		}
892 		r->rcvgen = sgen;
893 	}
894 
895 	/* dequeue acked packets */
896 	if(ack && agen == r->sndgen){
897 		ackreal = 0;
898 		while(r->unacked != nil && INSEQ(ack, r->ackrcvd, r->sndseq)){
899 			nbp = r->unacked;
900 			r->unacked = nbp->list;
901 			DPRINT("%lud/%lud acked, r->sndgen = %lud\n",
902 			       ack, agen, r->sndgen);
903 			freeb(nbp);
904 			r->ackrcvd = NEXTSEQ(r->ackrcvd);
905 			ackreal = 1;
906 		}
907 
908 		/* flow control */
909 		if(UNACKED(r) < Maxunacked/8 && r->blocked)
910 			wakeup(&r->vous);
911 
912 		/*
913 		 *  retransmit next packet if the acked packet
914 		 *  was transmitted more than once
915 		 */
916 		if(ackreal && r->unacked != nil){
917 			r->timeout = 0;
918 			if(r->xmits > 1){
919 				r->xmits = 1;
920 				relrexmit(c, r);
921 			}
922 		}
923 
924 	}
925 
926 	/* no message or input queue full */
927 	if(seq == 0 || qfull(c->rq))
928 		goto out;
929 
930 	/* refuse out of order delivery */
931 	if(seq != NEXTSEQ(r->rcvseq)){
932 		relsendack(c, r, 0);	/* tell him we got it already */
933 		upriv->orders++;
934 		DPRINT("out of sequence %lud not %lud\n", seq, NEXTSEQ(r->rcvseq));
935 		goto out;
936 	}
937 	r->rcvseq = seq;
938 
939 	rv = 0;
940 out:
941 	relput(r);
942 	return rv;
943 }
944 
945 void
946 relsendack(Conv *c, Reliable *r, int hangup)
947 {
948 	Udphdr *uh;
949 	Block *bp;
950 	Rudphdr *rh;
951 	int ptcllen;
952 	Fs *f;
953 
954 	bp = allocb(UDP_IPHDR + UDP_RHDRSIZE);
955 	if(bp == nil)
956 		return;
957 	bp->wp += UDP_IPHDR + UDP_RHDRSIZE;
958 	f = c->p->f;
959 	uh = (Udphdr *)(bp->rp);
960 	uh->vihl = IP_VER4;
961 	rh = (Rudphdr*)uh;
962 
963 	ptcllen = (UDP_RHDRSIZE-UDP_PHDRSIZE);
964 	uh->Unused = 0;
965 	uh->udpproto = IP_UDPPROTO;
966 	uh->frag[0] = 0;
967 	uh->frag[1] = 0;
968 	hnputs(uh->udpplen, ptcllen);
969 
970 	v6tov4(uh->udpdst, r->addr);
971 	hnputs(uh->udpdport, r->port);
972 	hnputs(uh->udpsport, c->lport);
973 	if(ipcmp(c->laddr, IPnoaddr) == 0)
974 		findlocalip(f, c->laddr, c->raddr);
975 	v6tov4(uh->udpsrc, c->laddr);
976 	hnputs(uh->udplen, ptcllen);
977 
978 	if(hangup)
979 		hnputl(rh->relsgen, Hangupgen);
980 	else
981 		hnputl(rh->relsgen, r->sndgen);
982 	hnputl(rh->relseq, 0);
983 	hnputl(rh->relagen, r->rcvgen);
984 	hnputl(rh->relack, r->rcvseq);
985 
986 	if(r->acksent < r->rcvseq)
987 		r->acksent = r->rcvseq;
988 
989 	uh->udpcksum[0] = 0;
990 	uh->udpcksum[1] = 0;
991 	hnputs(uh->udpcksum, ptclcsum(bp, UDP_IPHDR, UDP_RHDRSIZE));
992 
993 	DPRINT("sendack: %lud/%lud, %lud/%lud\n", 0L, r->sndgen, r->rcvseq, r->rcvgen);
994 	doipoput(c, f, bp, 0, c->ttl, c->tos);
995 }
996 
997 
998 /*
999  *  called with ucb locked (and c locked if user initiated close)
1000  */
1001 void
1002 relhangup(Conv *c, Reliable *r)
1003 {
1004 	int n;
1005 	Block *bp;
1006 	char hup[ERRMAX];
1007 
1008 	n = snprint(hup, sizeof(hup), "hangup %I!%d", r->addr, r->port);
1009 	qproduce(c->eq, hup, n);
1010 
1011 	/*
1012 	 *  dump any unacked outgoing messages
1013 	 */
1014 	for(bp = r->unacked; bp != nil; bp = r->unacked){
1015 		r->unacked = bp->list;
1016 		bp->list = nil;
1017 		freeb(bp);
1018 	}
1019 
1020 	r->rcvgen = 0;
1021 	r->rcvseq = 0;
1022 	r->acksent = 0;
1023 	if(generation == Hangupgen)
1024 		generation++;
1025 	r->sndgen = generation++;
1026 	r->sndseq = 0;
1027 	r->ackrcvd = 0;
1028 	r->xmits = 0;
1029 	r->timeout = 0;
1030 	wakeup(&r->vous);
1031 }
1032 
1033 /*
1034  *  called with ucb locked
1035  */
1036 void
1037 relrexmit(Conv *c, Reliable *r)
1038 {
1039 	Rudppriv *upriv;
1040 	Block *np;
1041 	Fs *f;
1042 
1043 	upriv = c->p->priv;
1044 	f = c->p->f;
1045 	r->timeout = 0;
1046 	if(r->xmits++ > Rudpmaxxmit){
1047 		relhangup(c, r);
1048 		return;
1049 	}
1050 
1051 	upriv->rxmits++;
1052 	np = copyblock(r->unacked, blocklen(r->unacked));
1053 	DPRINT("rxmit r->ackrvcd+1 = %lud\n", r->ackrcvd+1);
1054 	doipoput(c, f, np, 0, c->ttl, c->tos);
1055 }
1056