xref: /plan9-contrib/sys/src/9/port/devloopback.c (revision d46c239f8612929b7dbade67d0d071633df3a15d)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 typedef struct Link	Link;
9 typedef struct Loop	Loop;
10 
11 struct Link
12 {
13 	Lock;
14 
15 	int	ref;
16 
17 	long	packets;	/* total number of packets sent */
18 	long	bytes;		/* total number of bytes sent */
19 	int	indrop;		/* enable dropping on iq overflow */
20 	long	soverflows;	/* packets dropped because iq overflowed */
21 	long	droprate;	/* drop 1/droprate packets in tq */
22 	long	drops;		/* packets deliberately dropped */
23 
24 	vlong	delay0ns;	/* nanosec of delay in the link */
25 	long	delaynns;	/* nanosec of delay per byte */
26 	vlong	delay0;		/* fastticks of delay */
27 	long	delayn;
28 
29 	Block	*tq;		/* transmission queue */
30 	Block	*tqtail;
31 	vlong	tout;		/* time the last packet in tq is really out */
32 	vlong	tin;		/* time the head packet in tq enters the remote side  */
33 
34 	long	limit;		/* queue buffering limit */
35 	Queue	*oq;		/* output queue from other side & packets in the link */
36 	Queue	*iq;
37 
38 	Timer	ci;		/* time to move packets from  next packet from oq */
39 };
40 
41 struct Loop
42 {
43 	QLock;
44 	int	ref;
45 	int	minmtu;		/* smallest block transmittable */
46 	Loop	*next;
47 	ulong	path;
48 	Link	link[2];
49 };
50 
51 static struct
52 {
53 	Lock;
54 	ulong	path;
55 } loopbackalloc;
56 
57 enum
58 {
59 	Qtopdir=	1,		/* top level directory */
60 
61 	Qloopdir,			/* loopback* directory */
62 
63 	Qportdir,			/* directory each end of the loop */
64 	Qctl,
65 	Qstatus,
66 	Qstats,
67 	Qdata,
68 
69 	MaxQ,
70 
71 	Nloopbacks	= 5,
72 
73 	Statelen	= 23*1024,	/* status buffer size */
74 
75 	Tmsize		= 8,
76 	Delayn 		= 10000,	/* default delays */
77 	Delay0 		= 2500000,
78 
79 	Loopqlim	= 32*1024,	/* default size of queues */
80 };
81 
82 static Dirtab loopportdir[] =
83 {
84 	"ctl",		{Qctl},		0,			0222,
85 	"status",	{Qstatus},	0,			0444,
86 	"stats",	{Qstats},	0,			0444,
87 	"data",		{Qdata},	0,			0666,
88 };
89 static Dirtab loopdirs[MaxQ];
90 
91 static Loop	loopbacks[Nloopbacks];
92 
93 static uvlong	fasthz;
94 
95 #define TYPE(x) 	(((ulong)(x))&0xff)
96 #define ID(x) 		(((ulong)(x))>>8)
97 #define QID(x,y) 	((((ulong)(x))<<8)|((ulong)(y)))
98 
99 #define NS2FASTHZ(t)	((fasthz*(t))/1000000000);
100 
101 static void	looper(Loop *lb);
102 static long	loopoput(Loop *lb, Link *link, Block *bp);
103 static void	ptime(uchar *p, vlong t);
104 static vlong	gtime(uchar *p);
105 static void	closelink(Link *link, int dofree);
106 static void	pushlink(Link *link, vlong now);
107 static void	freelb(Loop *lb);
108 static void	linkintr(Ureg*, Timer *ci);
109 
110 static void
111 loopbackinit(void)
112 {
113 	int i;
114 
115 	for(i = 0; i < Nloopbacks; i++)
116 		loopbacks[i].path = i;
117 
118 	/* invert directory tables for non-directory entries */
119 	for(i=0; i<nelem(loopportdir); i++)
120 		loopdirs[loopportdir[i].qid.path] = loopportdir[i];
121 }
122 
123 static Chan*
124 loopbackattach(char *spec)
125 {
126 	Loop *volatile lb;
127 	Queue *q;
128 	Chan *c;
129 	int chan;
130 	int dev;
131 
132 	dev = 0;
133 	if(spec != nil){
134 		dev = atoi(spec);
135 		if(dev >= Nloopbacks)
136 			error(Ebadspec);
137 	}
138 
139 	c = devattach('X', spec);
140 	lb = &loopbacks[dev];
141 
142 	qlock(lb);
143 	if(waserror()){
144 		lb->ref--;
145 		qunlock(lb);
146 		nexterror();
147 	}
148 
149 	lb->ref++;
150 	if(lb->ref == 1){
151 		fastticks(&fasthz);
152 		for(chan = 0; chan < 2; chan++){
153 			lb->link[chan].ci.a = &lb->link[chan];
154 			lb->link[chan].ci.f = linkintr;
155 			lb->link[chan].limit = Loopqlim;
156 			q = qopen(lb->link[chan].limit, 0, 0, 0);
157 			lb->link[chan].iq = q;
158 			if(q == nil){
159 				freelb(lb);
160 				exhausted("memory");
161 			}
162 			q = qopen(lb->link[chan].limit, 0, 0, 0);
163 			lb->link[chan].oq = q;
164 			if(q == nil){
165 				freelb(lb);
166 				exhausted("memory");
167 			}
168 			lb->link[chan].indrop = 1;
169 
170 			lb->link[chan].delayn = NS2FASTHZ(Delayn);
171 			lb->link[chan].delaynns = Delayn;
172 			lb->link[chan].delay0 = NS2FASTHZ(Delay0);
173 			lb->link[chan].delay0ns = Delay0;
174 		}
175 	}
176 	poperror();
177 	qunlock(lb);
178 
179 	mkqid(&c->qid, QID(0, Qtopdir), 0, QTDIR);
180 	c->aux = lb;
181 	c->dev = dev;
182 	return c;
183 }
184 
185 static int
186 loopbackgen(Chan *c, char*, Dirtab*, int, int i, Dir *dp)
187 {
188 	Loop *lb;
189 	Dirtab *tab;
190 	int len, type;
191 	Qid qid;
192 
193 	type = TYPE(c->qid.path);
194 	if(i == DEVDOTDOT){
195 		switch(type){
196 		case Qtopdir:
197 		case Qloopdir:
198 			snprint(up->genbuf, sizeof(up->genbuf), "#X%ld", c->dev);
199 			mkqid(&qid, QID(0, Qtopdir), 0, QTDIR);
200 			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
201 			break;
202 		case Qportdir:
203 			snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
204 			mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
205 			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
206 			break;
207 		default:
208 			panic("loopbackgen %llux", c->qid.path);
209 		}
210 		return 1;
211 	}
212 
213 	switch(type){
214 	case Qtopdir:
215 		if(i != 0)
216 			return -1;
217 		snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
218 		mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
219 		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
220 		return 1;
221 	case Qloopdir:
222 		if(i >= 2)
223 			return -1;
224 		snprint(up->genbuf, sizeof(up->genbuf), "%d", i);
225 		mkqid(&qid, QID(i, QID(0, Qportdir)), 0, QTDIR);
226 		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
227 		return 1;
228 	case Qportdir:
229 		if(i >= nelem(loopportdir))
230 			return -1;
231 		tab = &loopportdir[i];
232 		mkqid(&qid, QID(ID(c->qid.path), tab->qid.path), 0, QTFILE);
233 		devdir(c, qid, tab->name, tab->length, eve, tab->perm, dp);
234 		return 1;
235 	default:
236 		/* non directory entries end up here; must be in lowest level */
237 		if(c->qid.type & QTDIR)
238 			panic("loopbackgen: unexpected directory");
239 		if(i != 0)
240 			return -1;
241 		tab = &loopdirs[type];
242 		if(tab == nil)
243 			panic("loopbackgen: unknown type: %d", type);
244 		len = tab->length;
245 		if(type == Qdata){
246 			lb = c->aux;
247 			len = qlen(lb->link[ID(c->qid.path)].iq);
248 		}
249 		devdir(c, c->qid, tab->name, len, eve, tab->perm, dp);
250 		return 1;
251 	}
252 }
253 
254 
255 static Walkqid*
256 loopbackwalk(Chan *c, Chan *nc, char **name, int nname)
257 {
258 	Walkqid *wq;
259 	Loop *lb;
260 
261 	wq = devwalk(c, nc, name, nname, nil, 0, loopbackgen);
262 	if(wq != nil && wq->clone != nil && wq->clone != c){
263 		lb = c->aux;
264 		qlock(lb);
265 		lb->ref++;
266 		if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata)
267 			lb->link[ID(c->qid.path)].ref++;
268 		qunlock(lb);
269 	}
270 	return wq;
271 }
272 
273 static int
274 loopbackstat(Chan *c, uchar *db, int n)
275 {
276 	return devstat(c, db, n, nil, 0, loopbackgen);
277 }
278 
279 /*
280  *  if the stream doesn't exist, create it
281  */
282 static Chan*
283 loopbackopen(Chan *c, int omode)
284 {
285 	Loop *lb;
286 
287 	if(c->qid.type & QTDIR){
288 		if(omode != OREAD)
289 			error(Ebadarg);
290 		c->mode = omode;
291 		c->flag |= COPEN;
292 		c->offset = 0;
293 		return c;
294 	}
295 
296 	lb = c->aux;
297 	qlock(lb);
298 	if(TYPE(c->qid.path) == Qdata){
299 		if(lb->link[ID(c->qid.path)].ref){
300 			qunlock(lb);
301 			error(Einuse);
302 		}
303 		lb->link[ID(c->qid.path)].ref++;
304 	}
305 	qunlock(lb);
306 
307 	c->mode = openmode(omode);
308 	c->flag |= COPEN;
309 	c->offset = 0;
310 	c->iounit = qiomaxatomic;
311 	return c;
312 }
313 
314 static void
315 loopbackclose(Chan *c)
316 {
317 	Loop *lb;
318 	int ref, chan;
319 
320 	lb = c->aux;
321 
322 	qlock(lb);
323 
324 	/*
325 	 * closing either side hangs up the stream
326 	 */
327 	if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata){
328 		chan = ID(c->qid.path);
329 		if(--lb->link[chan].ref == 0){
330 			qhangup(lb->link[chan ^ 1].oq, nil);
331 			looper(lb);
332 		}
333 	}
334 
335 
336 	/*
337 	 *  if both sides are closed, they are reusable
338 	 */
339 	if(lb->link[0].ref == 0 && lb->link[1].ref == 0){
340 		for(chan = 0; chan < 2; chan++){
341 			closelink(&lb->link[chan], 0);
342 			qreopen(lb->link[chan].iq);
343 			qreopen(lb->link[chan].oq);
344 			qsetlimit(lb->link[chan].oq, lb->link[chan].limit);
345 			qsetlimit(lb->link[chan].iq, lb->link[chan].limit);
346 		}
347 	}
348 	ref = --lb->ref;
349 	if(ref == 0)
350 		freelb(lb);
351 	qunlock(lb);
352 }
353 
354 static void
355 freelb(Loop *lb)
356 {
357 	int chan;
358 
359 	for(chan = 0; chan < 2; chan++)
360 		closelink(&lb->link[chan], 1);
361 }
362 
363 /*
364  * called with the Loop qlocked,
365  * so only pushlink can mess with the queues
366  */
367 static void
368 closelink(Link *link, int dofree)
369 {
370 	Queue *iq, *oq;
371 	Block *bp;
372 
373 	ilock(link);
374 	iq = link->iq;
375 	oq = link->oq;
376 	bp = link->tq;
377 	link->tq = nil;
378 	link->tqtail = nil;
379 	link->tout = 0;
380 	link->tin = 0;
381 	timerdel(&link->ci);
382 	iunlock(link);
383 	if(iq != nil){
384 		qclose(iq);
385 		if(dofree){
386 			ilock(link);
387 			free(iq);
388 			link->iq = nil;
389 			iunlock(link);
390 		}
391 	}
392 	if(oq != nil){
393 		qclose(oq);
394 		if(dofree){
395 			ilock(link);
396 			free(oq);
397 			link->oq = nil;
398 			iunlock(link);
399 		}
400 	}
401 	freeblist(bp);
402 }
403 
404 static long
405 loopbackread(Chan *c, void *va, long n, vlong offset)
406 {
407 	Loop *lb;
408 	Link *link;
409 	char *buf;
410 	long rv;
411 
412 	lb = c->aux;
413 	switch(TYPE(c->qid.path)){
414 	default:
415 		error(Eperm);
416 		return -1;	/* not reached */
417 	case Qtopdir:
418 	case Qloopdir:
419 	case Qportdir:
420 		return devdirread(c, va, n, nil, 0, loopbackgen);
421 	case Qdata:
422 		return qread(lb->link[ID(c->qid.path)].iq, va, n);
423 	case Qstatus:
424 		link = &lb->link[ID(c->qid.path)];
425 		buf = smalloc(Statelen);
426 		rv = snprint(buf, Statelen, "delay %lld %ld\n", link->delay0ns, link->delaynns);
427 		rv += snprint(buf+rv, Statelen-rv, "limit %ld\n", link->limit);
428 		rv += snprint(buf+rv, Statelen-rv, "indrop %d\n", link->indrop);
429 		snprint(buf+rv, Statelen-rv, "droprate %ld\n", link->droprate);
430 		rv = readstr(offset, va, n, buf);
431 		free(buf);
432 		break;
433 	case Qstats:
434 		link = &lb->link[ID(c->qid.path)];
435 		buf = smalloc(Statelen);
436 		rv = snprint(buf, Statelen, "packets: %ld\n", link->packets);
437 		rv += snprint(buf+rv, Statelen-rv, "bytes: %ld\n", link->bytes);
438 		rv += snprint(buf+rv, Statelen-rv, "dropped: %ld\n", link->drops);
439 		snprint(buf+rv, Statelen-rv, "soft overflows: %ld\n", link->soverflows);
440 		rv = readstr(offset, va, n, buf);
441 		free(buf);
442 		break;
443 	}
444 	return rv;
445 }
446 
447 static Block*
448 loopbackbread(Chan *c, long n, ulong offset)
449 {
450 	Loop *lb;
451 
452 	lb = c->aux;
453 	if(TYPE(c->qid.path) == Qdata)
454 		return qbread(lb->link[ID(c->qid.path)].iq, n);
455 
456 	return devbread(c, n, offset);
457 }
458 
459 static long
460 loopbackbwrite(Chan *c, Block *bp, ulong off)
461 {
462 	Loop *lb;
463 
464 	lb = c->aux;
465 	if(TYPE(c->qid.path) == Qdata)
466 		return loopoput(lb, &lb->link[ID(c->qid.path) ^ 1], bp);
467 	return devbwrite(c, bp, off);
468 }
469 
470 static long
471 loopbackwrite(Chan *c, void *va, long n, vlong off)
472 {
473 	Loop *lb;
474 	Link *link;
475 	Cmdbuf *volatile cb;
476 	Block *volatile bp;
477 	vlong d0, d0ns;
478 	long dn, dnns;
479 
480 	switch(TYPE(c->qid.path)){
481 	case Qdata:
482 		bp = allocb(n);
483 		if(waserror()){
484 			freeb(bp);
485 			nexterror();
486 		}
487 		memmove(bp->wp, va, n);
488 		poperror();
489 		bp->wp += n;
490 		return loopbackbwrite(c, bp, off);
491 	case Qctl:
492 		lb = c->aux;
493 		link = &lb->link[ID(c->qid.path)];
494 		cb = parsecmd(va, n);
495 		if(waserror()){
496 			free(cb);
497 			nexterror();
498 		}
499 		if(cb->nf < 1)
500 			error("short control request");
501 		if(strcmp(cb->f[0], "delay") == 0){
502 			if(cb->nf != 3)
503 				error("usage: delay latency bytedelay");
504 			d0ns = strtoll(cb->f[1], nil, 10);
505 			dnns = strtol(cb->f[2], nil, 10);
506 
507 			/*
508 			 * it takes about 20000 cycles on a pentium ii
509 			 * to run pushlink; perhaps this should be accounted.
510 			 */
511 			d0 = NS2FASTHZ(d0ns);
512 			dn = NS2FASTHZ(dnns);
513 
514 			ilock(link);
515 			link->delay0 = d0;
516 			link->delayn = dn;
517 			link->delay0ns = d0ns;
518 			link->delaynns = dnns;
519 			iunlock(link);
520 		}else if(strcmp(cb->f[0], "indrop") == 0){
521 			if(cb->nf != 2)
522 				error("usage: indrop [01]");
523 			ilock(link);
524 			link->indrop = strtol(cb->f[1], nil, 0) != 0;
525 			iunlock(link);
526 		}else if(strcmp(cb->f[0], "droprate") == 0){
527 			if(cb->nf != 2)
528 				error("usage: droprate ofn");
529 			ilock(link);
530 			link->droprate = strtol(cb->f[1], nil, 0);
531 			iunlock(link);
532 		}else if(strcmp(cb->f[0], "limit") == 0){
533 			if(cb->nf != 2)
534 				error("usage: limit maxqsize");
535 			ilock(link);
536 			link->limit = strtol(cb->f[1], nil, 0);
537 			qsetlimit(link->oq, link->limit);
538 			qsetlimit(link->iq, link->limit);
539 			iunlock(link);
540 		}else if(strcmp(cb->f[0], "reset") == 0){
541 			if(cb->nf != 1)
542 				error("usage: reset");
543 			ilock(link);
544 			link->packets = 0;
545 			link->bytes = 0;
546 			link->indrop = 0;
547 			link->soverflows = 0;
548 			link->drops = 0;
549 			iunlock(link);
550 		}else
551 			error("unknown control request");
552 		poperror();
553 		free(cb);
554 		break;
555 	default:
556 		error(Eperm);
557 	}
558 
559 	return n;
560 }
561 
562 static long
563 loopoput(Loop *lb, Link *link, Block *volatile bp)
564 {
565 	long n;
566 
567 	n = BLEN(bp);
568 
569 	/* make it a single block with space for the loopback timing header */
570 	if(waserror()){
571 		freeb(bp);
572 		nexterror();
573 	}
574 	bp = padblock(bp, Tmsize);
575 	if(bp->next)
576 		bp = concatblock(bp);
577 	if(BLEN(bp) < lb->minmtu)
578 		bp = adjustblock(bp, lb->minmtu);
579 	poperror();
580 	ptime(bp->rp, fastticks(nil));
581 
582 	link->packets++;
583 	link->bytes += n;
584 
585 	qbwrite(link->oq, bp);
586 
587 	looper(lb);
588 	return n;
589 }
590 
591 static void
592 looper(Loop *lb)
593 {
594 	vlong t;
595 	int chan;
596 
597 	t = fastticks(nil);
598 	for(chan = 0; chan < 2; chan++)
599 		pushlink(&lb->link[chan], t);
600 }
601 
602 static void
603 linkintr(Ureg*, Timer *ci)
604 {
605 	Link *link;
606 
607 	link = ci->a;
608 	pushlink(link, ci->when);
609 }
610 
611 /*
612  * move blocks between queues if they are ready.
613  * schedule an interrupt for the next interesting time.
614  *
615  * must be called with the link ilocked.
616  */
617 static void
618 pushlink(Link *link, vlong now)
619 {
620 	Block *bp;
621 	vlong tout, tin;
622 
623 	/*
624 	 * put another block in the link queue
625 	 */
626 	ilock(link);
627 	if(link->iq == nil || link->oq == nil){
628 		iunlock(link);
629 		return;
630 
631 	}
632 	timerdel(&link->ci);
633 
634 	/*
635 	 * put more blocks into the xmit queue
636 	 * use the time the last packet was supposed to go out
637 	 * as the start time for the next packet, rather than
638 	 * the current time.  this more closely models a network
639 	 * device which can queue multiple output packets.
640 	 */
641 	tout = link->tout;
642 	if(!tout)
643 		tout = now;
644 	while(tout <= now){
645 		bp = qget(link->oq);
646 		if(bp == nil){
647 			tout = 0;
648 			break;
649 		}
650 
651 		/*
652 		 * can't send the packet before it gets queued
653 		 */
654 		tin = gtime(bp->rp);
655 		if(tin > tout)
656 			tout = tin;
657 		tout = tout + (BLEN(bp) - Tmsize) * link->delayn;
658 
659 		/*
660 		 * drop packets
661 		 */
662 		if(link->droprate && nrand(link->droprate) == 0)
663 			link->drops++;
664 		else{
665 			ptime(bp->rp, tout + link->delay0);
666 			if(link->tq == nil)
667 				link->tq = bp;
668 			else
669 				link->tqtail->next = bp;
670 			link->tqtail = bp;
671 		}
672 	}
673 
674 	/*
675 	 * record the next time a packet can be sent,
676 	 * but don't schedule an interrupt if none is waiting
677 	 */
678 	link->tout = tout;
679 	if(!qcanread(link->oq))
680 		tout = 0;
681 
682 	/*
683 	 * put more blocks into the receive queue
684 	 */
685 	tin = 0;
686 	while(bp = link->tq){
687 		tin = gtime(bp->rp);
688 		if(tin > now)
689 			break;
690 		bp->rp += Tmsize;
691 		link->tq = bp->next;
692 		bp->next = nil;
693 		if(!link->indrop)
694 			qpassnolim(link->iq, bp);
695 		else if(qpass(link->iq, bp) < 0)
696 			link->soverflows++;
697 		tin = 0;
698 	}
699 	if(bp == nil && qisclosed(link->oq) && !qcanread(link->oq) && !qisclosed(link->iq))
700 		qhangup(link->iq, nil);
701 	link->tin = tin;
702 	if(!tin || tin > tout && tout)
703 		tin = tout;
704 
705 	link->ci.when = tin;
706 	if(tin){
707 		if(tin < now)
708 			panic("loopback unfinished business");
709 		timeradd(&link->ci);
710 	}
711 	iunlock(link);
712 }
713 
714 static void
715 ptime(uchar *p, vlong t)
716 {
717 	ulong tt;
718 
719 	tt = t >> 32;
720 	p[0] = tt >> 24;
721 	p[1] = tt >> 16;
722 	p[2] = tt >> 8;
723 	p[3] = tt;
724 	tt = t;
725 	p[4] = tt >> 24;
726 	p[5] = tt >> 16;
727 	p[6] = tt >> 8;
728 	p[7] = tt;
729 }
730 
731 static vlong
732 gtime(uchar *p)
733 {
734 	ulong t1, t2;
735 
736 	t1 = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3];
737 	t2 = (p[4] << 24) | (p[5] << 16) | (p[6] << 8) | p[7];
738 	return ((vlong)t1 << 32) | t2;
739 }
740 
741 Dev loopbackdevtab = {
742 	'X',
743 	"loopback",
744 
745 	devreset,
746 	loopbackinit,
747 	devshutdown,
748 	loopbackattach,
749 	loopbackwalk,
750 	loopbackstat,
751 	loopbackopen,
752 	devcreate,
753 	loopbackclose,
754 	loopbackread,
755 	loopbackbread,
756 	loopbackwrite,
757 	loopbackbwrite,
758 	devremove,
759 	devwstat,
760 };
761