xref: /plan9/sys/src/cmd/cwfs/net.c (revision 01a344a29f2ff35133953eaef092a50fc8c3163b)
1 /* network i/o */
2 
3 #include "all.h"
4 #include "io.h"
5 #include <fcall.h>		/* 9p2000 */
6 #include <thread.h>
7 
8 enum {
9 	Maxfdata	= 8192,
10 	Nqueue		= 200,		/* queue size (tunable) */
11 
12 	Netclosed	= 0,		/* Connection state */
13 	Netopen,
14 };
15 
16 /*
17  * the kernel file server read packets directly from
18  * its ethernet(s) and did all the protocol processing.
19  * if the incoming packets were 9p (over il/ip), they
20  * were queued for the server processes to operate upon.
21  *
22  * in user mode, we have one process per incoming connection
23  * instead, and those processes get just the data, minus
24  * tcp and ip headers, so they just see a stream of 9p messages,
25  * which they then queue for the server processes.
26  *
27  * there used to be more queueing (in the kernel), with separate
28  * processes for ethernet input, il input, 9p processing, il output
29  * and ethernet output, and queues connecting them.  we now let
30  * the kernel's network queues, protocol stacks and processes do
31  * much of this work.
32  *
33  * partly as a result of this, we can now process 9p messages
34  * transported via tcp, exploit multiple x86 processors, and
35  * were able to shed 70% of the file server's source, by line count.
36  *
37  * the upshot is that Ether (now Network) is no longer a perfect fit for
38  * the way network i/o is done now.  the notion of `connection'
39  * is being introduced to complement it.
40  */
41 
42 typedef struct Network Network;
43 typedef struct Netconn Netconn;
44 typedef struct Conn9p Conn9p;
45 
46 /* a network, not necessarily an ethernet */
47 struct Network {
48 	int	ctlrno;
49 	char	iname[NAMELEN];
50 	char	oname[NAMELEN];
51 
52 	char	*dialstr;
53 	char	anndir[40];
54 	char	lisdir[40];
55 	int	annfd;			/* fd from announce */
56 };
57 
58 /* an open tcp (or other transport) connection */
59 struct Netconn {
60 	Queue*	reply;		/* network output */
61 	char*	raddr;		/* remote caller's addr */
62 	Chan*	chan;		/* list of tcp channels */
63 
64 	int	alloc;		/* flag: allocated */
65 
66 	int	state;
67 	Conn9p*	conn9p;		/* not reference-counted */
68 
69 	Lock;
70 };
71 
72 /*
73  * incoming 9P network connection from a given machine.
74  * typically will multiplex 9P sessions for multiple users.
75  */
76 struct Conn9p {
77 	QLock;
78 	Ref;
79 	int	fd;
80 	char*	dir;
81 	Netconn*netconn;		/* cross-connection */
82 	char*	raddr;
83 };
84 
85 static Network netif[Maxnets];
86 static struct {
87 	Lock;
88 	Chan*	chan;
89 } netchans;
90 static Queue *netoq;		/* only one network output queue is needed */
91 
92 char *annstrs[Maxnets] = {
93 	"tcp!*!9fs",
94 };
95 
96 /* never returns nil */
97 static Chan*
getchan(Conn9p * conn9p)98 getchan(Conn9p *conn9p)
99 {
100 	Netconn *netconn;
101 	Chan *cp, *xcp;
102 
103 	lock(&netchans);
104 
105 	/* look for conn9p's Chan */
106 	xcp = nil;
107 	for(cp = netchans.chan; cp; cp = netconn->chan) {
108 		netconn = cp->pdata;
109 		if(!netconn->alloc)
110 			xcp = cp;		/* remember free Chan */
111 		else if(netconn->raddr != nil &&
112 		    strcmp(conn9p->raddr, netconn->raddr) == 0) {
113 			unlock(&netchans);
114 			return cp;		/* found conn9p's Chan */
115 		}
116 	}
117 
118 	/* conn9p's Chan not found; if no free Chan, allocate & fill in one */
119 	cp = xcp;
120 	if(cp == nil) {
121 		cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
122 		netconn = cp->pdata;
123 		netconn->chan = netchans.chan;
124 		netconn->state = Netopen;	/* a guess */
125 		/* cross-connect netconn and conn9p */
126 		netconn->conn9p = conn9p;	/* not reference-counted */
127 		conn9p->netconn = netconn;
128 		netchans.chan = cp;
129 	}
130 
131 	/* fill in Chan's netconn */
132 	netconn = cp->pdata;
133 	netconn->raddr = strdup(conn9p->raddr);
134 
135 	/* fill in Chan */
136 	cp->send = serveq;
137 	if (cp->reply == nil)
138 		cp->reply = netoq;
139 	netconn->reply = netoq;
140 	cp->protocol = nil;
141 	cp->msize = 0;
142 	cp->whotime = 0;
143 	strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
144 //	cp->whoprint = tcpwhoprint;
145 	netconn->alloc = 1;
146 
147 	unlock(&netchans);
148 	return cp;
149 }
150 
151 static char *
fd2name(int fd)152 fd2name(int fd)
153 {
154 	char data[128];
155 
156 	if (fd2path(fd, data, sizeof data) < 0)
157 		return strdup("/GOK");
158 	return strdup(data);
159 }
160 
161 static void
hangupdfd(int dfd)162 hangupdfd(int dfd)
163 {
164 	int ctlfd;
165 	char *end, *data;
166 
167 	data = fd2name(dfd);
168 	close(dfd);
169 
170 	end = strstr(data, "/data");
171 	if (end != nil)
172 		strcpy(end, "/ctl");
173 	ctlfd = open(data, OWRITE);
174 	if (ctlfd >= 0) {
175 		hangup(ctlfd);
176 		close(ctlfd);
177 	}
178 	free(data);
179 }
180 
181 void
closechan(int n)182 closechan(int n)
183 {
184 	Chan *cp;
185 
186 	for(cp = chans; cp; cp = cp->next)
187 		if(cp->whotime != 0 && cp->chan == n)
188 			fileinit(cp);
189 }
190 
191 void
nethangup(Chan * cp,char * msg,int dolock)192 nethangup(Chan *cp, char *msg, int dolock)
193 {
194 	Netconn *netconn;
195 
196 	netconn = cp->pdata;
197 	netconn->state = Netclosed;
198 
199 	if(msg != nil)
200 		print("hangup! %s %s\n", msg, netconn->raddr);
201 
202 	fileinit(cp);
203 	cp->whotime = 0;
204 	strcpy(cp->whoname, "<none>");
205 
206 	if(dolock)
207 		lock(&netchans);
208 	netconn->alloc = 0;
209 	free(netconn->raddr);
210 	netconn->raddr = nil;
211 	if(dolock)
212 		unlock(&netchans);
213 }
214 
215 void
chanhangup(Chan * cp,char * msg,int dolock)216 chanhangup(Chan *cp, char *msg, int dolock)
217 {
218 	Netconn *netconn = cp->pdata;
219 	Conn9p *conn9p = netconn->conn9p;
220 
221 	if (conn9p->fd > 0)
222 		hangupdfd(conn9p->fd);	/* drop it */
223 	nethangup(cp, msg, dolock);
224 }
225 
226 /*
227  * returns length of next 9p message (including the length) and
228  * leaves it in the first few bytes of abuf.
229  */
230 static long
size9pmsg(int fd,void * abuf,uint n)231 size9pmsg(int fd, void *abuf, uint n)
232 {
233 	int m;
234 	uchar *buf = abuf;
235 
236 	if (n < BIT32SZ)
237 		return -1;	/* caller screwed up */
238 
239 	/* read count */
240 	m = readn(fd, buf, BIT32SZ);
241 	if(m != BIT32SZ){
242 		if(m < 0)
243 			return -1;
244 		return 0;
245 	}
246 	return GBIT32(buf);
247 }
248 
249 static int
readalloc9pmsg(int fd,Msgbuf ** mbp)250 readalloc9pmsg(int fd, Msgbuf **mbp)
251 {
252 	int m, len;
253 	uchar lenbuf[BIT32SZ];
254 	Msgbuf *mb;
255 
256 	*mbp = nil;
257 	len = size9pmsg(fd, lenbuf, BIT32SZ);
258 	if (len <= 0)
259 		return len;
260 	if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
261 		werrstr("bad length in 9P2000 message header");
262 		return -1;
263 	}
264 	if ((mb = mballoc(len, nil, Mbeth1)) == nil)
265 		panic("readalloc9pmsg: mballoc failed");
266 	*mbp = mb;
267 	memmove(mb->data, lenbuf, BIT32SZ);
268 	len -= BIT32SZ;
269 	m = readn(fd, mb->data+BIT32SZ, len);
270 	if(m < len)
271 		return 0;
272 	return BIT32SZ+m;
273 }
274 
275 static void
connection(void * v)276 connection(void *v)
277 {
278 	int n;
279 	char buf[64];
280 	Chan *chan9p;
281 	Conn9p *conn9p = v;
282 	Msgbuf *mb;
283 	NetConnInfo *nci;
284 
285 	incref(conn9p);			/* count connections */
286 	nci = getnetconninfo(conn9p->dir, conn9p->fd);
287 	if (nci == nil)
288 		panic("connection: getnetconninfo(%s, %d) failed",
289 			conn9p->dir, conn9p->fd);
290 	conn9p->raddr = nci->raddr;
291 
292 	chan9p = getchan(conn9p);
293 	print("new connection on %s pid %d from %s\n",
294 		conn9p->dir, getpid(), conn9p->raddr);
295 
296 	/*
297 	 * reading from a pipe or a network device
298 	 * will give an error after a few eof reads.
299 	 * however, we cannot tell the difference
300 	 * between a zero-length read and an interrupt
301 	 * on the processes writing to us,
302 	 * so we wait for the error.
303 	 */
304 	while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
305 		if(n == 0)
306 			continue;
307 		mb->param = (uintptr)conn9p;	/* has fd for replies */
308 		mb->chan = chan9p;
309 
310 		assert(mb->magic == Mbmagic);
311 		incref(conn9p);			/* & count packets in flight */
312 		fs_send(serveq, mb);		/* to 9P server processes */
313 		/* mb will be freed by receiving process */
314 	}
315 
316 	rerrstr(buf, sizeof buf);
317 
318 	qlock(conn9p);
319 	print("connection hung up from %s\n", conn9p->dir);
320 	if (conn9p->fd > 0)		/* not poisoned yet? */
321 		hangupdfd(conn9p->fd);	/* poison the fd */
322 
323 	nethangup(chan9p, "remote hung up", 1);
324 	closechan(chan9p->chan);
325 
326 	conn9p->fd = -1;		/* poison conn9p */
327 	if (decref(conn9p) == 0) {	/* last conn.? turn the lights off */
328 		free(conn9p->dir);
329 		qunlock(conn9p);
330 		free(conn9p);
331 	} else
332 		qunlock(conn9p);
333 
334 	freenetconninfo(nci);
335 
336 	if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
337 		exits("");
338 	sysfatal("mount read, pid %d", getpid());
339 }
340 
341 static void
neti(void * v)342 neti(void *v)
343 {
344 	int lisfd, accfd;
345 	Network *net;
346 	Conn9p *conn9p;
347 
348 	net = v;
349 	print("net%di\n", net->ctlrno);
350 	for(;;) {
351 		lisfd = listen(net->anndir, net->lisdir);
352 		if (lisfd < 0) {
353 			print("listen %s failed: %r\n", net->anndir);
354 			continue;
355 		}
356 
357 		/* got new call on lisfd */
358 		accfd = accept(lisfd, net->lisdir);
359 		if (accfd < 0) {
360 			print("accept %d (from %s) failed: %r\n",
361 				lisfd, net->lisdir);
362 			continue;
363 		}
364 
365 		/* accepted that call */
366 		conn9p = malloc(sizeof *conn9p);
367 		conn9p->dir = strdup(net->lisdir);
368 		conn9p->fd = accfd;
369 		newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
370 		close(lisfd);
371 	}
372 }
373 
374 /* only need one of these for all network connections, thus all interfaces */
375 static void
neto(void *)376 neto(void *)
377 {
378 	int len, datafd;
379 	Msgbuf *mb;
380 	Conn9p *conn9p;
381 
382 	print("neto\n");
383 	for(;;) {
384 		/* receive 9P answer from 9P server processes */
385 		while((mb = fs_recv(netoq, 0)) == nil)
386 			continue;
387 
388 		if(mb->data == nil) {
389 			print("neto: pkt nil cat=%d free=%d\n",
390 				mb->category, mb->flags&FREE);
391 			if(!(mb->flags & FREE))
392 				mbfree(mb);
393 			continue;
394 		}
395 
396 		/* send answer back over the network connection in the reply */
397 		len = mb->count;
398 		conn9p = (Conn9p *)mb->param;
399 		assert(conn9p);
400 
401 		qlock(conn9p);
402 		datafd = conn9p->fd;
403 		assert(len >= 0);
404 		/* datafd < 0 probably indicates poisoning by the read side */
405 		if (datafd < 0 || write(datafd, mb->data, len) != len) {
406 			print( "network write error (%r);");
407 			print(" closing connection for %s\n", conn9p->dir);
408 			nethangup(getchan(conn9p), "network write error", 1);
409 			if (datafd > 0)
410 				hangupdfd(datafd);	/* drop it */
411 			conn9p->fd = -1;		/* poison conn9p */
412 		}
413 		mbfree(mb);
414 		if (decref(conn9p) == 0)
415 			panic("neto: zero ref count");
416 		qunlock(conn9p);
417 	}
418 }
419 
420 void
netstart(void)421 netstart(void)
422 {
423 	int netorun = 0;
424 	Network *net;
425 
426 	if(netoq == nil)
427 		netoq = newqueue(Nqueue, "network reply");
428 	for(net = &netif[0]; net < &netif[Maxnets]; net++){
429 		if(net->dialstr == nil)
430 			continue;
431 		sprint(net->oname, "neto");
432 		if (netorun++ == 0)
433 			newproc(neto, nil, net->oname);
434 		sprint(net->iname, "net%di", net->ctlrno);
435 		newproc(neti, net, net->iname);
436 	}
437 }
438 
439 void
netinit(void)440 netinit(void)
441 {
442 	Network *net;
443 
444 	for (net = netif; net < netif + Maxnets; net++) {
445 		net->dialstr = annstrs[net - netif];
446 		if (net->dialstr == nil)
447 			continue;
448 		net->annfd = announce(net->dialstr, net->anndir);
449 		/* /bin/service/tcp564 may already have grabbed the port */
450 		if (net->annfd < 0)
451 			sysfatal("can't announce %s: %r", net->dialstr);
452 		print("netinit: announced on %s\n", net->dialstr);
453 	}
454 }
455