1 /* network i/o */ 2 3 #include "all.h" 4 #include "io.h" 5 #include <fcall.h> /* 9p2000 */ 6 #include <thread.h> 7 8 enum { 9 Maxfdata = 8192, 10 Nqueue = 200, /* queue size (tunable) */ 11 12 Netclosed = 0, /* Connection state */ 13 Netopen, 14 }; 15 16 /* 17 * the kernel file server read packets directly from 18 * its ethernet(s) and did all the protocol processing. 19 * if the incoming packets were 9p (over il/ip), they 20 * were queued for the server processes to operate upon. 21 * 22 * in user mode, we have one process per incoming connection 23 * instead, and those processes get just the data, minus 24 * tcp and ip headers, so they just see a stream of 9p messages, 25 * which they then queue for the server processes. 26 * 27 * there used to be more queueing (in the kernel), with separate 28 * processes for ethernet input, il input, 9p processing, il output 29 * and ethernet output, and queues connecting them. we now let 30 * the kernel's network queues, protocol stacks and processes do 31 * much of this work. 32 * 33 * partly as a result of this, we can now process 9p messages 34 * transported via tcp, exploit multiple x86 processors, and 35 * were able to shed 70% of the file server's source, by line count. 36 * 37 * the upshot is that Ether (now Network) is no longer a perfect fit for 38 * the way network i/o is done now. the notion of `connection' 39 * is being introduced to complement it. 40 */ 41 42 typedef struct Network Network; 43 typedef struct Netconn Netconn; 44 typedef struct Conn9p Conn9p; 45 46 /* a network, not necessarily an ethernet */ 47 struct Network { 48 int ctlrno; 49 char iname[NAMELEN]; 50 char oname[NAMELEN]; 51 52 char *dialstr; 53 char anndir[40]; 54 char lisdir[40]; 55 int annfd; /* fd from announce */ 56 }; 57 58 /* an open tcp (or other transport) connection */ 59 struct Netconn { 60 Queue* reply; /* network output */ 61 char* raddr; /* remote caller's addr */ 62 Chan* chan; /* list of tcp channels */ 63 64 int alloc; /* flag: allocated */ 65 66 int state; 67 Conn9p* conn9p; /* not reference-counted */ 68 69 Lock; 70 }; 71 72 /* 73 * incoming 9P network connection from a given machine. 74 * typically will multiplex 9P sessions for multiple users. 75 */ 76 struct Conn9p { 77 QLock; 78 Ref; 79 int fd; 80 char* dir; 81 Netconn*netconn; /* cross-connection */ 82 char* raddr; 83 }; 84 85 static Network netif[Maxnets]; 86 static struct { 87 Lock; 88 Chan* chan; 89 } netchans; 90 static Queue *netoq; /* only one network output queue is needed */ 91 92 char *annstrs[Maxnets] = { 93 "tcp!*!9fs", 94 }; 95 96 /* never returns nil */ 97 static Chan* 98 getchan(Conn9p *conn9p) 99 { 100 Netconn *netconn; 101 Chan *cp, *xcp; 102 103 lock(&netchans); 104 105 /* look for conn9p's Chan */ 106 xcp = nil; 107 for(cp = netchans.chan; cp; cp = netconn->chan) { 108 netconn = cp->pdata; 109 if(!netconn->alloc) 110 xcp = cp; /* remember free Chan */ 111 else if(netconn->raddr != nil && 112 strcmp(conn9p->raddr, netconn->raddr) == 0) { 113 unlock(&netchans); 114 return cp; /* found conn9p's Chan */ 115 } 116 } 117 118 /* conn9p's Chan not found; if no free Chan, allocate & fill in one */ 119 cp = xcp; 120 if(cp == nil) { 121 cp = fs_chaninit(Devnet, 1, sizeof(Netconn)); 122 netconn = cp->pdata; 123 netconn->chan = netchans.chan; 124 netconn->state = Netopen; /* a guess */ 125 /* cross-connect netconn and conn9p */ 126 netconn->conn9p = conn9p; /* not reference-counted */ 127 conn9p->netconn = netconn; 128 netchans.chan = cp; 129 } 130 131 /* fill in Chan's netconn */ 132 netconn = cp->pdata; 133 netconn->raddr = strdup(conn9p->raddr); 134 135 /* fill in Chan */ 136 cp->send = serveq; 137 if (cp->reply == nil) 138 cp->reply = netoq; 139 netconn->reply = netoq; 140 cp->protocol = nil; 141 cp->msize = 0; 142 cp->whotime = 0; 143 strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan); 144 // cp->whoprint = tcpwhoprint; 145 netconn->alloc = 1; 146 147 unlock(&netchans); 148 return cp; 149 } 150 151 static char * 152 fd2name(int fd) 153 { 154 char data[128]; 155 156 if (fd2path(fd, data, sizeof data) < 0) 157 return strdup("/GOK"); 158 return strdup(data); 159 } 160 161 static void 162 hangupdfd(int dfd) 163 { 164 int ctlfd; 165 char *end, *data; 166 167 data = fd2name(dfd); 168 close(dfd); 169 170 end = strstr(data, "/data"); 171 if (end != nil) 172 strcpy(end, "/ctl"); 173 ctlfd = open(data, OWRITE); 174 if (ctlfd >= 0) { 175 hangup(ctlfd); 176 close(ctlfd); 177 } 178 free(data); 179 } 180 181 void 182 closechan(int n) 183 { 184 Chan *cp; 185 186 for(cp = chans; cp; cp = cp->next) 187 if(cp->whotime != 0 && cp->chan == n) 188 fileinit(cp); 189 } 190 191 void 192 nethangup(Chan *cp, char *msg, int dolock) 193 { 194 Netconn *netconn; 195 196 netconn = cp->pdata; 197 netconn->state = Netclosed; 198 199 if(msg != nil) 200 print("hangup! %s %s\n", msg, netconn->raddr); 201 202 fileinit(cp); 203 cp->whotime = 0; 204 strcpy(cp->whoname, "<none>"); 205 206 if(dolock) 207 lock(&netchans); 208 netconn->alloc = 0; 209 free(netconn->raddr); 210 netconn->raddr = nil; 211 if(dolock) 212 unlock(&netchans); 213 } 214 215 void 216 chanhangup(Chan *cp, char *msg, int dolock) 217 { 218 Netconn *netconn = cp->pdata; 219 Conn9p *conn9p = netconn->conn9p; 220 221 if (conn9p->fd > 0) 222 hangupdfd(conn9p->fd); /* drop it */ 223 nethangup(cp, msg, dolock); 224 } 225 226 /* 227 * returns length of next 9p message (including the length) and 228 * leaves it in the first few bytes of abuf. 229 */ 230 static long 231 size9pmsg(int fd, void *abuf, uint n) 232 { 233 int m; 234 uchar *buf = abuf; 235 236 if (n < BIT32SZ) 237 return -1; /* caller screwed up */ 238 239 /* read count */ 240 m = readn(fd, buf, BIT32SZ); 241 if(m != BIT32SZ){ 242 if(m < 0) 243 return -1; 244 return 0; 245 } 246 return GBIT32(buf); 247 } 248 249 static int 250 readalloc9pmsg(int fd, Msgbuf **mbp) 251 { 252 int m, len; 253 uchar lenbuf[BIT32SZ]; 254 Msgbuf *mb; 255 256 *mbp = nil; 257 len = size9pmsg(fd, lenbuf, BIT32SZ); 258 if (len <= 0) 259 return len; 260 if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){ 261 werrstr("bad length in 9P2000 message header"); 262 return -1; 263 } 264 if ((mb = mballoc(len, nil, Mbeth1)) == nil) 265 panic("readalloc9pmsg: mballoc failed"); 266 *mbp = mb; 267 memmove(mb->data, lenbuf, BIT32SZ); 268 len -= BIT32SZ; 269 m = readn(fd, mb->data+BIT32SZ, len); 270 if(m < len) 271 return 0; 272 return BIT32SZ+m; 273 } 274 275 static void 276 connection(void *v) 277 { 278 int n; 279 char buf[64]; 280 Chan *chan9p; 281 Conn9p *conn9p = v; 282 Msgbuf *mb; 283 NetConnInfo *nci; 284 285 incref(conn9p); /* count connections */ 286 nci = getnetconninfo(conn9p->dir, conn9p->fd); 287 if (nci == nil) 288 panic("connection: getnetconninfo(%s, %d) failed", 289 conn9p->dir, conn9p->fd); 290 conn9p->raddr = nci->raddr; 291 292 chan9p = getchan(conn9p); 293 print("new connection on %s pid %d from %s\n", 294 conn9p->dir, getpid(), conn9p->raddr); 295 296 /* 297 * reading from a pipe or a network device 298 * will give an error after a few eof reads. 299 * however, we cannot tell the difference 300 * between a zero-length read and an interrupt 301 * on the processes writing to us, 302 * so we wait for the error. 303 */ 304 while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) { 305 if(n == 0) 306 continue; 307 mb->param = (uintptr)conn9p; /* has fd for replies */ 308 mb->chan = chan9p; 309 310 assert(mb->magic == Mbmagic); 311 incref(conn9p); /* & count packets in flight */ 312 fs_send(serveq, mb); /* to 9P server processes */ 313 /* mb will be freed by receiving process */ 314 } 315 316 rerrstr(buf, sizeof buf); 317 318 qlock(conn9p); 319 print("connection hung up from %s\n", conn9p->dir); 320 if (conn9p->fd > 0) /* not poisoned yet? */ 321 hangupdfd(conn9p->fd); /* poison the fd */ 322 323 nethangup(chan9p, "remote hung up", 1); 324 closechan(chan9p->chan); 325 326 conn9p->fd = -1; /* poison conn9p */ 327 if (decref(conn9p) == 0) { /* last conn.? turn the lights off */ 328 free(conn9p->dir); 329 qunlock(conn9p); 330 free(conn9p); 331 } else 332 qunlock(conn9p); 333 334 freenetconninfo(nci); 335 336 if(buf[0] == '\0' || strstr(buf, "hungup") != nil) 337 exits(""); 338 sysfatal("mount read, pid %d", getpid()); 339 } 340 341 static void 342 neti(void *v) 343 { 344 int lisfd, accfd; 345 Network *net; 346 Conn9p *conn9p; 347 348 net = v; 349 print("net%di\n", net->ctlrno); 350 for(;;) { 351 lisfd = listen(net->anndir, net->lisdir); 352 if (lisfd < 0) { 353 print("listen %s failed: %r\n", net->anndir); 354 continue; 355 } 356 357 /* got new call on lisfd */ 358 accfd = accept(lisfd, net->lisdir); 359 if (accfd < 0) { 360 print("accept %d (from %s) failed: %r\n", 361 lisfd, net->lisdir); 362 continue; 363 } 364 365 /* accepted that call */ 366 conn9p = malloc(sizeof *conn9p); 367 conn9p->dir = strdup(net->lisdir); 368 conn9p->fd = accfd; 369 newproc(connection, conn9p, smprint("9P read %s", conn9p->dir)); 370 close(lisfd); 371 } 372 } 373 374 /* only need one of these for all network connections, thus all interfaces */ 375 static void 376 neto(void *) 377 { 378 int len, datafd; 379 Msgbuf *mb; 380 Conn9p *conn9p; 381 382 print("neto\n"); 383 for(;;) { 384 /* receive 9P answer from 9P server processes */ 385 while((mb = fs_recv(netoq, 0)) == nil) 386 continue; 387 388 if(mb->data == nil) { 389 print("neto: pkt nil cat=%d free=%d\n", 390 mb->category, mb->flags&FREE); 391 if(!(mb->flags & FREE)) 392 mbfree(mb); 393 continue; 394 } 395 396 /* send answer back over the network connection in the reply */ 397 len = mb->count; 398 conn9p = (Conn9p *)mb->param; 399 assert(conn9p); 400 401 qlock(conn9p); 402 datafd = conn9p->fd; 403 assert(len >= 0); 404 /* datafd < 0 probably indicates poisoning by the read side */ 405 if (datafd < 0 || write(datafd, mb->data, len) != len) { 406 print( "network write error (%r);"); 407 print(" closing connection for %s\n", conn9p->dir); 408 nethangup(getchan(conn9p), "network write error", 1); 409 if (datafd > 0) 410 hangupdfd(datafd); /* drop it */ 411 conn9p->fd = -1; /* poison conn9p */ 412 } 413 mbfree(mb); 414 if (decref(conn9p) == 0) 415 panic("neto: zero ref count"); 416 qunlock(conn9p); 417 } 418 } 419 420 void 421 netstart(void) 422 { 423 int netorun = 0; 424 Network *net; 425 426 if(netoq == nil) 427 netoq = newqueue(Nqueue, "network reply"); 428 for(net = &netif[0]; net < &netif[Maxnets]; net++){ 429 if(net->dialstr == nil) 430 continue; 431 sprint(net->oname, "neto"); 432 if (netorun++ == 0) 433 newproc(neto, nil, net->oname); 434 sprint(net->iname, "net%di", net->ctlrno); 435 newproc(neti, net, net->iname); 436 } 437 } 438 439 void 440 netinit(void) 441 { 442 Network *net; 443 444 for (net = netif; net < netif + Maxnets; net++) { 445 net->dialstr = annstrs[net - netif]; 446 if (net->dialstr == nil) 447 continue; 448 net->annfd = announce(net->dialstr, net->anndir); 449 /* /bin/service/tcp564 may already have grabbed the port */ 450 if (net->annfd < 0) 451 sysfatal("can't announce %s: %r", net->dialstr); 452 print("netinit: announced on %s\n", net->dialstr); 453 } 454 } 455