1 /* network i/o */
2
3 #include "all.h"
4 #include "io.h"
5 #include <fcall.h> /* 9p2000 */
6 #include <thread.h>
7
8 enum {
9 Maxfdata = 8192,
10 Nqueue = 200, /* queue size (tunable) */
11
12 Netclosed = 0, /* Connection state */
13 Netopen,
14 };
15
16 /*
17 * the kernel file server read packets directly from
18 * its ethernet(s) and did all the protocol processing.
19 * if the incoming packets were 9p (over il/ip), they
20 * were queued for the server processes to operate upon.
21 *
22 * in user mode, we have one process per incoming connection
23 * instead, and those processes get just the data, minus
24 * tcp and ip headers, so they just see a stream of 9p messages,
25 * which they then queue for the server processes.
26 *
27 * there used to be more queueing (in the kernel), with separate
28 * processes for ethernet input, il input, 9p processing, il output
29 * and ethernet output, and queues connecting them. we now let
30 * the kernel's network queues, protocol stacks and processes do
31 * much of this work.
32 *
33 * partly as a result of this, we can now process 9p messages
34 * transported via tcp, exploit multiple x86 processors, and
35 * were able to shed 70% of the file server's source, by line count.
36 *
37 * the upshot is that Ether (now Network) is no longer a perfect fit for
38 * the way network i/o is done now. the notion of `connection'
39 * is being introduced to complement it.
40 */
41
42 typedef struct Network Network;
43 typedef struct Netconn Netconn;
44 typedef struct Conn9p Conn9p;
45
46 /* a network, not necessarily an ethernet */
47 struct Network {
48 int ctlrno;
49 char iname[NAMELEN];
50 char oname[NAMELEN];
51
52 char *dialstr;
53 char anndir[40];
54 char lisdir[40];
55 int annfd; /* fd from announce */
56 };
57
58 /* an open tcp (or other transport) connection */
59 struct Netconn {
60 Queue* reply; /* network output */
61 char* raddr; /* remote caller's addr */
62 Chan* chan; /* list of tcp channels */
63
64 int alloc; /* flag: allocated */
65
66 int state;
67 Conn9p* conn9p; /* not reference-counted */
68
69 Lock;
70 };
71
72 /*
73 * incoming 9P network connection from a given machine.
74 * typically will multiplex 9P sessions for multiple users.
75 */
76 struct Conn9p {
77 QLock;
78 Ref;
79 int fd;
80 char* dir;
81 Netconn*netconn; /* cross-connection */
82 char* raddr;
83 };
84
85 static Network netif[Maxnets];
86 static struct {
87 Lock;
88 Chan* chan;
89 } netchans;
90 static Queue *netoq; /* only one network output queue is needed */
91
92 char *annstrs[Maxnets] = {
93 "tcp!*!9fs",
94 };
95
96 /* never returns nil */
97 static Chan*
getchan(Conn9p * conn9p)98 getchan(Conn9p *conn9p)
99 {
100 Netconn *netconn;
101 Chan *cp, *xcp;
102
103 lock(&netchans);
104
105 /* look for conn9p's Chan */
106 xcp = nil;
107 for(cp = netchans.chan; cp; cp = netconn->chan) {
108 netconn = cp->pdata;
109 if(!netconn->alloc)
110 xcp = cp; /* remember free Chan */
111 else if(netconn->raddr != nil &&
112 strcmp(conn9p->raddr, netconn->raddr) == 0) {
113 unlock(&netchans);
114 return cp; /* found conn9p's Chan */
115 }
116 }
117
118 /* conn9p's Chan not found; if no free Chan, allocate & fill in one */
119 cp = xcp;
120 if(cp == nil) {
121 cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
122 netconn = cp->pdata;
123 netconn->chan = netchans.chan;
124 netconn->state = Netopen; /* a guess */
125 /* cross-connect netconn and conn9p */
126 netconn->conn9p = conn9p; /* not reference-counted */
127 conn9p->netconn = netconn;
128 netchans.chan = cp;
129 }
130
131 /* fill in Chan's netconn */
132 netconn = cp->pdata;
133 netconn->raddr = strdup(conn9p->raddr);
134
135 /* fill in Chan */
136 cp->send = serveq;
137 if (cp->reply == nil)
138 cp->reply = netoq;
139 netconn->reply = netoq;
140 cp->protocol = nil;
141 cp->msize = 0;
142 cp->whotime = 0;
143 strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
144 // cp->whoprint = tcpwhoprint;
145 netconn->alloc = 1;
146
147 unlock(&netchans);
148 return cp;
149 }
150
151 static char *
fd2name(int fd)152 fd2name(int fd)
153 {
154 char data[128];
155
156 if (fd2path(fd, data, sizeof data) < 0)
157 return strdup("/GOK");
158 return strdup(data);
159 }
160
161 static void
hangupdfd(int dfd)162 hangupdfd(int dfd)
163 {
164 int ctlfd;
165 char *end, *data;
166
167 data = fd2name(dfd);
168 close(dfd);
169
170 end = strstr(data, "/data");
171 if (end != nil)
172 strcpy(end, "/ctl");
173 ctlfd = open(data, OWRITE);
174 if (ctlfd >= 0) {
175 hangup(ctlfd);
176 close(ctlfd);
177 }
178 free(data);
179 }
180
181 void
closechan(int n)182 closechan(int n)
183 {
184 Chan *cp;
185
186 for(cp = chans; cp; cp = cp->next)
187 if(cp->whotime != 0 && cp->chan == n)
188 fileinit(cp);
189 }
190
191 void
nethangup(Chan * cp,char * msg,int dolock)192 nethangup(Chan *cp, char *msg, int dolock)
193 {
194 Netconn *netconn;
195
196 netconn = cp->pdata;
197 netconn->state = Netclosed;
198
199 if(msg != nil)
200 print("hangup! %s %s\n", msg, netconn->raddr);
201
202 fileinit(cp);
203 cp->whotime = 0;
204 strcpy(cp->whoname, "<none>");
205
206 if(dolock)
207 lock(&netchans);
208 netconn->alloc = 0;
209 free(netconn->raddr);
210 netconn->raddr = nil;
211 if(dolock)
212 unlock(&netchans);
213 }
214
215 void
chanhangup(Chan * cp,char * msg,int dolock)216 chanhangup(Chan *cp, char *msg, int dolock)
217 {
218 Netconn *netconn = cp->pdata;
219 Conn9p *conn9p = netconn->conn9p;
220
221 if (conn9p->fd > 0)
222 hangupdfd(conn9p->fd); /* drop it */
223 nethangup(cp, msg, dolock);
224 }
225
226 /*
227 * returns length of next 9p message (including the length) and
228 * leaves it in the first few bytes of abuf.
229 */
230 static long
size9pmsg(int fd,void * abuf,uint n)231 size9pmsg(int fd, void *abuf, uint n)
232 {
233 int m;
234 uchar *buf = abuf;
235
236 if (n < BIT32SZ)
237 return -1; /* caller screwed up */
238
239 /* read count */
240 m = readn(fd, buf, BIT32SZ);
241 if(m != BIT32SZ){
242 if(m < 0)
243 return -1;
244 return 0;
245 }
246 return GBIT32(buf);
247 }
248
249 static int
readalloc9pmsg(int fd,Msgbuf ** mbp)250 readalloc9pmsg(int fd, Msgbuf **mbp)
251 {
252 int m, len;
253 uchar lenbuf[BIT32SZ];
254 Msgbuf *mb;
255
256 *mbp = nil;
257 len = size9pmsg(fd, lenbuf, BIT32SZ);
258 if (len <= 0)
259 return len;
260 if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
261 werrstr("bad length in 9P2000 message header");
262 return -1;
263 }
264 if ((mb = mballoc(len, nil, Mbeth1)) == nil)
265 panic("readalloc9pmsg: mballoc failed");
266 *mbp = mb;
267 memmove(mb->data, lenbuf, BIT32SZ);
268 len -= BIT32SZ;
269 m = readn(fd, mb->data+BIT32SZ, len);
270 if(m < len)
271 return 0;
272 return BIT32SZ+m;
273 }
274
275 static void
connection(void * v)276 connection(void *v)
277 {
278 int n;
279 char buf[64];
280 Chan *chan9p;
281 Conn9p *conn9p = v;
282 Msgbuf *mb;
283 NetConnInfo *nci;
284
285 incref(conn9p); /* count connections */
286 nci = getnetconninfo(conn9p->dir, conn9p->fd);
287 if (nci == nil)
288 panic("connection: getnetconninfo(%s, %d) failed",
289 conn9p->dir, conn9p->fd);
290 conn9p->raddr = nci->raddr;
291
292 chan9p = getchan(conn9p);
293 print("new connection on %s pid %d from %s\n",
294 conn9p->dir, getpid(), conn9p->raddr);
295
296 /*
297 * reading from a pipe or a network device
298 * will give an error after a few eof reads.
299 * however, we cannot tell the difference
300 * between a zero-length read and an interrupt
301 * on the processes writing to us,
302 * so we wait for the error.
303 */
304 while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
305 if(n == 0)
306 continue;
307 mb->param = (uintptr)conn9p; /* has fd for replies */
308 mb->chan = chan9p;
309
310 assert(mb->magic == Mbmagic);
311 incref(conn9p); /* & count packets in flight */
312 fs_send(serveq, mb); /* to 9P server processes */
313 /* mb will be freed by receiving process */
314 }
315
316 rerrstr(buf, sizeof buf);
317
318 qlock(conn9p);
319 print("connection hung up from %s\n", conn9p->dir);
320 if (conn9p->fd > 0) /* not poisoned yet? */
321 hangupdfd(conn9p->fd); /* poison the fd */
322
323 nethangup(chan9p, "remote hung up", 1);
324 closechan(chan9p->chan);
325
326 conn9p->fd = -1; /* poison conn9p */
327 if (decref(conn9p) == 0) { /* last conn.? turn the lights off */
328 free(conn9p->dir);
329 qunlock(conn9p);
330 free(conn9p);
331 } else
332 qunlock(conn9p);
333
334 freenetconninfo(nci);
335
336 if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
337 exits("");
338 sysfatal("mount read, pid %d", getpid());
339 }
340
341 static void
neti(void * v)342 neti(void *v)
343 {
344 int lisfd, accfd;
345 Network *net;
346 Conn9p *conn9p;
347
348 net = v;
349 print("net%di\n", net->ctlrno);
350 for(;;) {
351 lisfd = listen(net->anndir, net->lisdir);
352 if (lisfd < 0) {
353 print("listen %s failed: %r\n", net->anndir);
354 continue;
355 }
356
357 /* got new call on lisfd */
358 accfd = accept(lisfd, net->lisdir);
359 if (accfd < 0) {
360 print("accept %d (from %s) failed: %r\n",
361 lisfd, net->lisdir);
362 continue;
363 }
364
365 /* accepted that call */
366 conn9p = malloc(sizeof *conn9p);
367 conn9p->dir = strdup(net->lisdir);
368 conn9p->fd = accfd;
369 newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
370 close(lisfd);
371 }
372 }
373
374 /* only need one of these for all network connections, thus all interfaces */
375 static void
neto(void *)376 neto(void *)
377 {
378 int len, datafd;
379 Msgbuf *mb;
380 Conn9p *conn9p;
381
382 print("neto\n");
383 for(;;) {
384 /* receive 9P answer from 9P server processes */
385 while((mb = fs_recv(netoq, 0)) == nil)
386 continue;
387
388 if(mb->data == nil) {
389 print("neto: pkt nil cat=%d free=%d\n",
390 mb->category, mb->flags&FREE);
391 if(!(mb->flags & FREE))
392 mbfree(mb);
393 continue;
394 }
395
396 /* send answer back over the network connection in the reply */
397 len = mb->count;
398 conn9p = (Conn9p *)mb->param;
399 assert(conn9p);
400
401 qlock(conn9p);
402 datafd = conn9p->fd;
403 assert(len >= 0);
404 /* datafd < 0 probably indicates poisoning by the read side */
405 if (datafd < 0 || write(datafd, mb->data, len) != len) {
406 print( "network write error (%r);");
407 print(" closing connection for %s\n", conn9p->dir);
408 nethangup(getchan(conn9p), "network write error", 1);
409 if (datafd > 0)
410 hangupdfd(datafd); /* drop it */
411 conn9p->fd = -1; /* poison conn9p */
412 }
413 mbfree(mb);
414 if (decref(conn9p) == 0)
415 panic("neto: zero ref count");
416 qunlock(conn9p);
417 }
418 }
419
420 void
netstart(void)421 netstart(void)
422 {
423 int netorun = 0;
424 Network *net;
425
426 if(netoq == nil)
427 netoq = newqueue(Nqueue, "network reply");
428 for(net = &netif[0]; net < &netif[Maxnets]; net++){
429 if(net->dialstr == nil)
430 continue;
431 sprint(net->oname, "neto");
432 if (netorun++ == 0)
433 newproc(neto, nil, net->oname);
434 sprint(net->iname, "net%di", net->ctlrno);
435 newproc(neti, net, net->iname);
436 }
437 }
438
439 void
netinit(void)440 netinit(void)
441 {
442 Network *net;
443
444 for (net = netif; net < netif + Maxnets; net++) {
445 net->dialstr = annstrs[net - netif];
446 if (net->dialstr == nil)
447 continue;
448 net->annfd = announce(net->dialstr, net->anndir);
449 /* /bin/service/tcp564 may already have grabbed the port */
450 if (net->annfd < 0)
451 sysfatal("can't announce %s: %r", net->dialstr);
452 print("netinit: announced on %s\n", net->dialstr);
453 }
454 }
455