xref: /plan9/sys/src/9/port/devloopback.c (revision ff8c3af2f44d95267f67219afa20ba82ff6cf7e4)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 typedef struct Link	Link;
9 typedef struct Loop	Loop;
10 
11 struct Link
12 {
13 	Lock;
14 
15 	int	ref;
16 
17 	long	packets;	/* total number of packets sent */
18 	long	bytes;		/* total number of bytes sent */
19 	int	indrop;		/* enable dropping on iq overflow */
20 	long	soverflows;	/* packets dropped because iq overflowed */
21 	long	droprate;	/* drop 1/droprate packets in tq */
22 	long	drops;		/* packets deliberately dropped */
23 
24 	vlong	delay0ns;	/* nanosec of delay in the link */
25 	long	delaynns;	/* nanosec of delay per byte */
26 
27 	Block	*tq;		/* transmission queue */
28 	Block	*tqtail;
29 	vlong	tout;		/* time the last packet in tq is really out */
30 	vlong	tin;		/* time the head packet in tq enters the remote side  */
31 
32 	long	limit;		/* queue buffering limit */
33 	Queue	*oq;		/* output queue from other side & packets in the link */
34 	Queue	*iq;
35 
36 	Timer	ci;		/* time to move packets from  next packet from oq */
37 };
38 
39 struct Loop
40 {
41 	QLock;
42 	int	ref;
43 	int	minmtu;		/* smallest block transmittable */
44 	Loop	*next;
45 	ulong	path;
46 	Link	link[2];
47 };
48 
49 static struct
50 {
51 	Lock;
52 	ulong	path;
53 } loopbackalloc;
54 
55 enum
56 {
57 	Qtopdir=	1,		/* top level directory */
58 
59 	Qloopdir,			/* loopback* directory */
60 
61 	Qportdir,			/* directory each end of the loop */
62 	Qctl,
63 	Qstatus,
64 	Qstats,
65 	Qdata,
66 
67 	MaxQ,
68 
69 	Nloopbacks	= 5,
70 
71 	Statelen	= 23*1024,	/* status buffer size */
72 
73 	Tmsize		= 8,
74 	Delayn 		= 10000,	/* default delays in ns */
75 	Delay0 		= 2500000,
76 
77 	Loopqlim	= 32*1024,	/* default size of queues */
78 };
79 
80 static Dirtab loopportdir[] =
81 {
82 	"ctl",		{Qctl},		0,			0222,
83 	"status",	{Qstatus},	0,			0444,
84 	"stats",	{Qstats},	0,			0444,
85 	"data",		{Qdata},	0,			0666,
86 };
87 static Dirtab loopdirs[MaxQ];
88 
89 static Loop	loopbacks[Nloopbacks];
90 
91 #define TYPE(x) 	(((ulong)(x))&0xff)
92 #define ID(x) 		(((ulong)(x))>>8)
93 #define QID(x,y) 	((((ulong)(x))<<8)|((ulong)(y)))
94 
95 static void	looper(Loop *lb);
96 static long	loopoput(Loop *lb, Link *link, Block *bp);
97 static void	ptime(uchar *p, vlong t);
98 static vlong	gtime(uchar *p);
99 static void	closelink(Link *link, int dofree);
100 static void	pushlink(Link *link, vlong now);
101 static void	freelb(Loop *lb);
102 static void	linkintr(Ureg*, Timer *ci);
103 
104 static void
105 loopbackinit(void)
106 {
107 	int i;
108 
109 	for(i = 0; i < Nloopbacks; i++)
110 		loopbacks[i].path = i;
111 
112 	/* invert directory tables for non-directory entries */
113 	for(i=0; i<nelem(loopportdir); i++)
114 		loopdirs[loopportdir[i].qid.path] = loopportdir[i];
115 }
116 
117 static Chan*
118 loopbackattach(char *spec)
119 {
120 	Loop *volatile lb;
121 	Queue *q;
122 	Chan *c;
123 	int chan;
124 	int dev;
125 
126 	dev = 0;
127 	if(spec != nil){
128 		dev = atoi(spec);
129 		if(dev >= Nloopbacks)
130 			error(Ebadspec);
131 	}
132 
133 	c = devattach('X', spec);
134 	lb = &loopbacks[dev];
135 
136 	qlock(lb);
137 	if(waserror()){
138 		lb->ref--;
139 		qunlock(lb);
140 		nexterror();
141 	}
142 
143 	lb->ref++;
144 	if(lb->ref == 1){
145 		for(chan = 0; chan < 2; chan++){
146 			lb->link[chan].ci.mode = Tabsolute;
147 			lb->link[chan].ci.a = &lb->link[chan];
148 			lb->link[chan].ci.f = linkintr;
149 			lb->link[chan].limit = Loopqlim;
150 			q = qopen(lb->link[chan].limit, 0, 0, 0);
151 			lb->link[chan].iq = q;
152 			if(q == nil){
153 				freelb(lb);
154 				exhausted("memory");
155 			}
156 			q = qopen(lb->link[chan].limit, 0, 0, 0);
157 			lb->link[chan].oq = q;
158 			if(q == nil){
159 				freelb(lb);
160 				exhausted("memory");
161 			}
162 			lb->link[chan].indrop = 1;
163 
164 			lb->link[chan].delaynns = Delayn;
165 			lb->link[chan].delay0ns = Delay0;
166 		}
167 	}
168 	poperror();
169 	qunlock(lb);
170 
171 	mkqid(&c->qid, QID(0, Qtopdir), 0, QTDIR);
172 	c->aux = lb;
173 	c->dev = dev;
174 	return c;
175 }
176 
177 static int
178 loopbackgen(Chan *c, char*, Dirtab*, int, int i, Dir *dp)
179 {
180 	Loop *lb;
181 	Dirtab *tab;
182 	int len, type;
183 	Qid qid;
184 
185 	type = TYPE(c->qid.path);
186 	if(i == DEVDOTDOT){
187 		switch(type){
188 		case Qtopdir:
189 		case Qloopdir:
190 			snprint(up->genbuf, sizeof(up->genbuf), "#X%ld", c->dev);
191 			mkqid(&qid, QID(0, Qtopdir), 0, QTDIR);
192 			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
193 			break;
194 		case Qportdir:
195 			snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
196 			mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
197 			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
198 			break;
199 		default:
200 			panic("loopbackgen %llux", c->qid.path);
201 		}
202 		return 1;
203 	}
204 
205 	switch(type){
206 	case Qtopdir:
207 		if(i != 0)
208 			return -1;
209 		snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
210 		mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
211 		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
212 		return 1;
213 	case Qloopdir:
214 		if(i >= 2)
215 			return -1;
216 		snprint(up->genbuf, sizeof(up->genbuf), "%d", i);
217 		mkqid(&qid, QID(i, QID(0, Qportdir)), 0, QTDIR);
218 		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
219 		return 1;
220 	case Qportdir:
221 		if(i >= nelem(loopportdir))
222 			return -1;
223 		tab = &loopportdir[i];
224 		mkqid(&qid, QID(ID(c->qid.path), tab->qid.path), 0, QTFILE);
225 		devdir(c, qid, tab->name, tab->length, eve, tab->perm, dp);
226 		return 1;
227 	default:
228 		/* non directory entries end up here; must be in lowest level */
229 		if(c->qid.type & QTDIR)
230 			panic("loopbackgen: unexpected directory");
231 		if(i != 0)
232 			return -1;
233 		tab = &loopdirs[type];
234 		if(tab == nil)
235 			panic("loopbackgen: unknown type: %d", type);
236 		len = tab->length;
237 		if(type == Qdata){
238 			lb = c->aux;
239 			len = qlen(lb->link[ID(c->qid.path)].iq);
240 		}
241 		devdir(c, c->qid, tab->name, len, eve, tab->perm, dp);
242 		return 1;
243 	}
244 }
245 
246 
247 static Walkqid*
248 loopbackwalk(Chan *c, Chan *nc, char **name, int nname)
249 {
250 	Walkqid *wq;
251 	Loop *lb;
252 
253 	wq = devwalk(c, nc, name, nname, nil, 0, loopbackgen);
254 	if(wq != nil && wq->clone != nil && wq->clone != c){
255 		lb = c->aux;
256 		qlock(lb);
257 		lb->ref++;
258 		if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata)
259 			lb->link[ID(c->qid.path)].ref++;
260 		qunlock(lb);
261 	}
262 	return wq;
263 }
264 
265 static int
266 loopbackstat(Chan *c, uchar *db, int n)
267 {
268 	return devstat(c, db, n, nil, 0, loopbackgen);
269 }
270 
271 /*
272  *  if the stream doesn't exist, create it
273  */
274 static Chan*
275 loopbackopen(Chan *c, int omode)
276 {
277 	Loop *lb;
278 
279 	if(c->qid.type & QTDIR){
280 		if(omode != OREAD)
281 			error(Ebadarg);
282 		c->mode = omode;
283 		c->flag |= COPEN;
284 		c->offset = 0;
285 		return c;
286 	}
287 
288 	lb = c->aux;
289 	qlock(lb);
290 	if(TYPE(c->qid.path) == Qdata){
291 		if(lb->link[ID(c->qid.path)].ref){
292 			qunlock(lb);
293 			error(Einuse);
294 		}
295 		lb->link[ID(c->qid.path)].ref++;
296 	}
297 	qunlock(lb);
298 
299 	c->mode = openmode(omode);
300 	c->flag |= COPEN;
301 	c->offset = 0;
302 	c->iounit = qiomaxatomic;
303 	return c;
304 }
305 
306 static void
307 loopbackclose(Chan *c)
308 {
309 	Loop *lb;
310 	int ref, chan;
311 
312 	lb = c->aux;
313 
314 	qlock(lb);
315 
316 	/*
317 	 * closing either side hangs up the stream
318 	 */
319 	if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata){
320 		chan = ID(c->qid.path);
321 		if(--lb->link[chan].ref == 0){
322 			qhangup(lb->link[chan ^ 1].oq, nil);
323 			looper(lb);
324 		}
325 	}
326 
327 
328 	/*
329 	 *  if both sides are closed, they are reusable
330 	 */
331 	if(lb->link[0].ref == 0 && lb->link[1].ref == 0){
332 		for(chan = 0; chan < 2; chan++){
333 			closelink(&lb->link[chan], 0);
334 			qreopen(lb->link[chan].iq);
335 			qreopen(lb->link[chan].oq);
336 			qsetlimit(lb->link[chan].oq, lb->link[chan].limit);
337 			qsetlimit(lb->link[chan].iq, lb->link[chan].limit);
338 		}
339 	}
340 	ref = --lb->ref;
341 	if(ref == 0)
342 		freelb(lb);
343 	qunlock(lb);
344 }
345 
346 static void
347 freelb(Loop *lb)
348 {
349 	int chan;
350 
351 	for(chan = 0; chan < 2; chan++)
352 		closelink(&lb->link[chan], 1);
353 }
354 
355 /*
356  * called with the Loop qlocked,
357  * so only pushlink can mess with the queues
358  */
359 static void
360 closelink(Link *link, int dofree)
361 {
362 	Queue *iq, *oq;
363 	Block *bp;
364 
365 	ilock(link);
366 	iq = link->iq;
367 	oq = link->oq;
368 	bp = link->tq;
369 	link->tq = nil;
370 	link->tqtail = nil;
371 	link->tout = 0;
372 	link->tin = 0;
373 	timerdel(&link->ci);
374 	iunlock(link);
375 	if(iq != nil){
376 		qclose(iq);
377 		if(dofree){
378 			ilock(link);
379 			free(iq);
380 			link->iq = nil;
381 			iunlock(link);
382 		}
383 	}
384 	if(oq != nil){
385 		qclose(oq);
386 		if(dofree){
387 			ilock(link);
388 			free(oq);
389 			link->oq = nil;
390 			iunlock(link);
391 		}
392 	}
393 	freeblist(bp);
394 }
395 
396 static long
397 loopbackread(Chan *c, void *va, long n, vlong offset)
398 {
399 	Loop *lb;
400 	Link *link;
401 	char *buf;
402 	long rv;
403 
404 	lb = c->aux;
405 	switch(TYPE(c->qid.path)){
406 	default:
407 		error(Eperm);
408 		return -1;	/* not reached */
409 	case Qtopdir:
410 	case Qloopdir:
411 	case Qportdir:
412 		return devdirread(c, va, n, nil, 0, loopbackgen);
413 	case Qdata:
414 		return qread(lb->link[ID(c->qid.path)].iq, va, n);
415 	case Qstatus:
416 		link = &lb->link[ID(c->qid.path)];
417 		buf = smalloc(Statelen);
418 		rv = snprint(buf, Statelen, "delay %lld %ld\n", link->delay0ns, link->delaynns);
419 		rv += snprint(buf+rv, Statelen-rv, "limit %ld\n", link->limit);
420 		rv += snprint(buf+rv, Statelen-rv, "indrop %d\n", link->indrop);
421 		snprint(buf+rv, Statelen-rv, "droprate %ld\n", link->droprate);
422 		rv = readstr(offset, va, n, buf);
423 		free(buf);
424 		break;
425 	case Qstats:
426 		link = &lb->link[ID(c->qid.path)];
427 		buf = smalloc(Statelen);
428 		rv = snprint(buf, Statelen, "packets: %ld\n", link->packets);
429 		rv += snprint(buf+rv, Statelen-rv, "bytes: %ld\n", link->bytes);
430 		rv += snprint(buf+rv, Statelen-rv, "dropped: %ld\n", link->drops);
431 		snprint(buf+rv, Statelen-rv, "soft overflows: %ld\n", link->soverflows);
432 		rv = readstr(offset, va, n, buf);
433 		free(buf);
434 		break;
435 	}
436 	return rv;
437 }
438 
439 static Block*
440 loopbackbread(Chan *c, long n, ulong offset)
441 {
442 	Loop *lb;
443 
444 	lb = c->aux;
445 	if(TYPE(c->qid.path) == Qdata)
446 		return qbread(lb->link[ID(c->qid.path)].iq, n);
447 
448 	return devbread(c, n, offset);
449 }
450 
451 static long
452 loopbackbwrite(Chan *c, Block *bp, ulong off)
453 {
454 	Loop *lb;
455 
456 	lb = c->aux;
457 	if(TYPE(c->qid.path) == Qdata)
458 		return loopoput(lb, &lb->link[ID(c->qid.path) ^ 1], bp);
459 	return devbwrite(c, bp, off);
460 }
461 
462 static long
463 loopbackwrite(Chan *c, void *va, long n, vlong off)
464 {
465 	Loop *lb;
466 	Link *link;
467 	Cmdbuf *volatile cb;
468 	Block *volatile bp;
469 	vlong d0, d0ns;
470 	long dn, dnns;
471 
472 	switch(TYPE(c->qid.path)){
473 	case Qdata:
474 		bp = allocb(n);
475 		if(waserror()){
476 			freeb(bp);
477 			nexterror();
478 		}
479 		memmove(bp->wp, va, n);
480 		poperror();
481 		bp->wp += n;
482 		return loopbackbwrite(c, bp, off);
483 	case Qctl:
484 		lb = c->aux;
485 		link = &lb->link[ID(c->qid.path)];
486 		cb = parsecmd(va, n);
487 		if(waserror()){
488 			free(cb);
489 			nexterror();
490 		}
491 		if(cb->nf < 1)
492 			error("short control request");
493 		if(strcmp(cb->f[0], "delay") == 0){
494 			if(cb->nf != 3)
495 				error("usage: delay latency bytedelay");
496 			d0ns = strtoll(cb->f[1], nil, 10);
497 			dnns = strtol(cb->f[2], nil, 10);
498 
499 			/*
500 			 * it takes about 20000 cycles on a pentium ii
501 			 * to run pushlink; perhaps this should be accounted.
502 			 */
503 
504 			ilock(link);
505 			link->delay0ns = d0ns;
506 			link->delaynns = dnns;
507 			iunlock(link);
508 		}else if(strcmp(cb->f[0], "indrop") == 0){
509 			if(cb->nf != 2)
510 				error("usage: indrop [01]");
511 			ilock(link);
512 			link->indrop = strtol(cb->f[1], nil, 0) != 0;
513 			iunlock(link);
514 		}else if(strcmp(cb->f[0], "droprate") == 0){
515 			if(cb->nf != 2)
516 				error("usage: droprate ofn");
517 			ilock(link);
518 			link->droprate = strtol(cb->f[1], nil, 0);
519 			iunlock(link);
520 		}else if(strcmp(cb->f[0], "limit") == 0){
521 			if(cb->nf != 2)
522 				error("usage: limit maxqsize");
523 			ilock(link);
524 			link->limit = strtol(cb->f[1], nil, 0);
525 			qsetlimit(link->oq, link->limit);
526 			qsetlimit(link->iq, link->limit);
527 			iunlock(link);
528 		}else if(strcmp(cb->f[0], "reset") == 0){
529 			if(cb->nf != 1)
530 				error("usage: reset");
531 			ilock(link);
532 			link->packets = 0;
533 			link->bytes = 0;
534 			link->indrop = 0;
535 			link->soverflows = 0;
536 			link->drops = 0;
537 			iunlock(link);
538 		}else
539 			error("unknown control request");
540 		poperror();
541 		free(cb);
542 		break;
543 	default:
544 		error(Eperm);
545 	}
546 
547 	return n;
548 }
549 
550 static long
551 loopoput(Loop *lb, Link *link, Block *volatile bp)
552 {
553 	long n;
554 
555 	n = BLEN(bp);
556 
557 	/* make it a single block with space for the loopback timing header */
558 	if(waserror()){
559 		freeb(bp);
560 		nexterror();
561 	}
562 	bp = padblock(bp, Tmsize);
563 	if(bp->next)
564 		bp = concatblock(bp);
565 	if(BLEN(bp) < lb->minmtu)
566 		bp = adjustblock(bp, lb->minmtu);
567 	poperror();
568 	ptime(bp->rp, todget(nil));
569 
570 	link->packets++;
571 	link->bytes += n;
572 
573 	qbwrite(link->oq, bp);
574 
575 	looper(lb);
576 	return n;
577 }
578 
579 static void
580 looper(Loop *lb)
581 {
582 	vlong t;
583 	int chan;
584 
585 	t = todget(nil);
586 	for(chan = 0; chan < 2; chan++)
587 		pushlink(&lb->link[chan], t);
588 }
589 
590 static void
591 linkintr(Ureg*, Timer *ci)
592 {
593 	Link *link;
594 
595 	link = ci->a;
596 	pushlink(link, ci->ns);
597 }
598 
599 /*
600  * move blocks between queues if they are ready.
601  * schedule an interrupt for the next interesting time.
602  *
603  * must be called with the link ilocked.
604  */
605 static void
606 pushlink(Link *link, vlong now)
607 {
608 	Block *bp;
609 	vlong tout, tin;
610 
611 	/*
612 	 * put another block in the link queue
613 	 */
614 	ilock(link);
615 	if(link->iq == nil || link->oq == nil){
616 		iunlock(link);
617 		return;
618 
619 	}
620 	timerdel(&link->ci);
621 
622 	/*
623 	 * put more blocks into the xmit queue
624 	 * use the time the last packet was supposed to go out
625 	 * as the start time for the next packet, rather than
626 	 * the current time.  this more closely models a network
627 	 * device which can queue multiple output packets.
628 	 */
629 	tout = link->tout;
630 	if(!tout)
631 		tout = now;
632 	while(tout <= now){
633 		bp = qget(link->oq);
634 		if(bp == nil){
635 			tout = 0;
636 			break;
637 		}
638 
639 		/*
640 		 * can't send the packet before it gets queued
641 		 */
642 		tin = gtime(bp->rp);
643 		if(tin > tout)
644 			tout = tin;
645 		tout = tout + (BLEN(bp) - Tmsize) * link->delayn;
646 
647 		/*
648 		 * drop packets
649 		 */
650 		if(link->droprate && nrand(link->droprate) == 0)
651 			link->drops++;
652 		else{
653 			ptime(bp->rp, tout + link->delay0ns);
654 			if(link->tq == nil)
655 				link->tq = bp;
656 			else
657 				link->tqtail->next = bp;
658 			link->tqtail = bp;
659 		}
660 	}
661 
662 	/*
663 	 * record the next time a packet can be sent,
664 	 * but don't schedule an interrupt if none is waiting
665 	 */
666 	link->tout = tout;
667 	if(!qcanread(link->oq))
668 		tout = 0;
669 
670 	/*
671 	 * put more blocks into the receive queue
672 	 */
673 	tin = 0;
674 	while(bp = link->tq){
675 		tin = gtime(bp->rp);
676 		if(tin > now)
677 			break;
678 		bp->rp += Tmsize;
679 		link->tq = bp->next;
680 		bp->next = nil;
681 		if(!link->indrop)
682 			qpassnolim(link->iq, bp);
683 		else if(qpass(link->iq, bp) < 0)
684 			link->soverflows++;
685 		tin = 0;
686 	}
687 	if(bp == nil && qisclosed(link->oq) && !qcanread(link->oq) && !qisclosed(link->iq))
688 		qhangup(link->iq, nil);
689 	link->tin = tin;
690 	if(!tin || tin > tout && tout)
691 		tin = tout;
692 
693 	link->ci.ns = tin;
694 	if(tin){
695 		if(tin < now)
696 			panic("loopback unfinished business");
697 		timeradd(&link->ci);
698 	}
699 	iunlock(link);
700 }
701 
702 static void
703 ptime(uchar *p, vlong t)
704 {
705 	ulong tt;
706 
707 	tt = t >> 32;
708 	p[0] = tt >> 24;
709 	p[1] = tt >> 16;
710 	p[2] = tt >> 8;
711 	p[3] = tt;
712 	tt = t;
713 	p[4] = tt >> 24;
714 	p[5] = tt >> 16;
715 	p[6] = tt >> 8;
716 	p[7] = tt;
717 }
718 
719 static vlong
720 gtime(uchar *p)
721 {
722 	ulong t1, t2;
723 
724 	t1 = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3];
725 	t2 = (p[4] << 24) | (p[5] << 16) | (p[6] << 8) | p[7];
726 	return ((vlong)t1 << 32) | t2;
727 }
728 
729 Dev loopbackdevtab = {
730 	'X',
731 	"loopback",
732 
733 	devreset,
734 	loopbackinit,
735 	devshutdown,
736 	loopbackattach,
737 	loopbackwalk,
738 	loopbackstat,
739 	loopbackopen,
740 	devcreate,
741 	loopbackclose,
742 	loopbackread,
743 	loopbackbread,
744 	loopbackwrite,
745 	loopbackbwrite,
746 	devremove,
747 	devwstat,
748 };
749