1 #include <u.h> 2 #include <libc.h> 3 #include <fcall.h> 4 #include <thread.h> 5 #include <9p.h> 6 #include "dat.h" 7 8 int nclient; 9 Client **client; 10 #define Zmsg ((Msg*)~0) 11 char nocmd[] = ""; 12 13 static void readthread(void*); 14 static void writethread(void*); 15 static void kickwriter(Client*); 16 17 int 18 newclient(void) 19 { 20 int i; 21 Client *c; 22 23 for(i=0; i<nclient; i++) 24 if(client[i]->ref==0 && !client[i]->moribund) 25 return i; 26 27 c = emalloc(sizeof(Client)); 28 c->writerkick = chancreate(sizeof(void*), 1); 29 c->execpid = chancreate(sizeof(ulong), 0); 30 c->cmd = nocmd; 31 32 c->readerproc = ioproc(); 33 c->writerproc = ioproc(); 34 c->num = nclient; 35 if(nclient%16 == 0) 36 client = erealloc(client, (nclient+16)*sizeof(client[0])); 37 client[nclient++] = c; 38 return nclient-1; 39 } 40 41 void 42 die(Client *c) 43 { 44 Msg *m, *next; 45 Req *r, *rnext; 46 47 c->moribund = 1; 48 kickwriter(c); 49 iointerrupt(c->readerproc); 50 iointerrupt(c->writerproc); 51 if(--c->activethread == 0){ 52 if(c->cmd != nocmd){ 53 free(c->cmd); 54 c->cmd = nocmd; 55 } 56 c->pid = 0; 57 c->moribund = 0; 58 c->status = Closed; 59 for(m=c->mq; m && m != Zmsg; m=next){ 60 next = m->link; 61 free(m); 62 } 63 c->mq = nil; 64 if(c->rq != nil){ 65 for(r=c->rq; r; r=rnext){ 66 rnext = r->aux; 67 respond(r, "hangup"); 68 } 69 c->rq = nil; 70 } 71 if(c->wq != nil){ 72 for(r=c->wq; r; r=rnext){ 73 rnext = r->aux; 74 respond(r, "hangup"); 75 } 76 c->wq = nil; 77 } 78 c->rq = nil; 79 c->wq = nil; 80 c->emq = nil; 81 c->erq = nil; 82 c->ewq = nil; 83 } 84 } 85 86 void 87 closeclient(Client *c) 88 { 89 if(--c->ref == 0){ 90 if(c->pid > 0) 91 postnote(PNPROC, c->pid, "kill"); 92 c->status = Hangup; 93 close(c->fd[0]); 94 c->fd[0] = c->fd[1] = -1; 95 c->moribund = 1; 96 kickwriter(c); 97 iointerrupt(c->readerproc); 98 iointerrupt(c->writerproc); 99 c->activethread++; 100 die(c); 101 } 102 } 103 104 void 105 queuerdreq(Client *c, Req *r) 106 { 107 if(c->rq==nil) 108 c->erq = &c->rq; 109 *c->erq = r; 110 r->aux = nil; 111 c->erq = (Req**)&r->aux; 112 } 113 114 void 115 queuewrreq(Client *c, Req *r) 116 { 117 if(c->wq==nil) 118 c->ewq = &c->wq; 119 *c->ewq = r; 120 r->aux = nil; 121 c->ewq = (Req**)&r->aux; 122 } 123 124 void 125 queuemsg(Client *c, Msg *m) 126 { 127 if(c->mq==nil) 128 c->emq = &c->mq; 129 *c->emq = m; 130 if(m != Zmsg){ 131 m->link = nil; 132 c->emq = (Msg**)&m->link; 133 }else 134 c->emq = nil; 135 } 136 137 void 138 matchmsgs(Client *c) 139 { 140 Req *r; 141 Msg *m; 142 int n, rm; 143 144 while(c->rq && c->mq){ 145 r = c->rq; 146 c->rq = r->aux; 147 148 rm = 0; 149 m = c->mq; 150 if(m == Zmsg){ 151 respond(r, "execnet: no more data"); 152 break; 153 } 154 n = r->ifcall.count; 155 if(n >= m->ep - m->rp){ 156 n = m->ep - m->rp; 157 c->mq = m->link; 158 rm = 1; 159 } 160 if(n) 161 memmove(r->ofcall.data, m->rp, n); 162 if(rm) 163 free(m); 164 else 165 m->rp += n; 166 r->ofcall.count = n; 167 respond(r, nil); 168 } 169 } 170 171 Req* 172 findrdreq(Client *c, Req *r) 173 { 174 Req **l; 175 176 for(l=&c->rq; *l; l=(Req**)&(*l)->aux){ 177 if(*l == r){ 178 *l = r->aux; 179 if(*l == nil) 180 c->erq = l; 181 return r; 182 } 183 } 184 return nil; 185 } 186 187 Req* 188 findwrreq(Client *c, Req *r) 189 { 190 Req **l; 191 192 for(l=&c->wq; *l; l=(Req**)&(*l)->aux){ 193 if(*l == r){ 194 *l = r->aux; 195 if(*l == nil) 196 c->ewq = l; 197 return r; 198 } 199 } 200 return nil; 201 } 202 203 void 204 dataread(Req *r, Client *c) 205 { 206 queuerdreq(c, r); 207 matchmsgs(c); 208 } 209 210 static void 211 readthread(void *a) 212 { 213 uchar *buf; 214 int n; 215 Client *c; 216 Ioproc *io; 217 Msg *m; 218 char tmp[32]; 219 220 c = a; 221 snprint(tmp, sizeof tmp, "read%d", c->num); 222 threadsetname(tmp); 223 224 buf = emalloc(8192); 225 io = c->readerproc; 226 while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){ 227 m = emalloc(sizeof(Msg)+n); 228 m->rp = (uchar*)&m[1]; 229 m->ep = m->rp + n; 230 if(n) 231 memmove(m->rp, buf, n); 232 queuemsg(c, m); 233 matchmsgs(c); 234 } 235 queuemsg(c, Zmsg); 236 free(buf); 237 die(c); 238 } 239 240 static void 241 kickwriter(Client *c) 242 { 243 nbsendp(c->writerkick, nil); 244 } 245 246 void 247 clientflush(Req *or, Client *c) 248 { 249 /* BUG: this leaks Req structures; we should closereq sometimes */ 250 if(or->ifcall.type == Tread) 251 findrdreq(c, or); 252 else{ 253 if(c->execreq == or){ 254 c->execreq = nil; 255 iointerrupt(c->writerproc); 256 } 257 findwrreq(c, or); 258 if(c->curw == or){ 259 c->curw = nil; 260 iointerrupt(c->writerproc); 261 kickwriter(c); 262 } 263 } 264 } 265 266 void 267 datawrite(Req *r, Client *c) 268 { 269 queuewrreq(c, r); 270 kickwriter(c); 271 } 272 273 static void 274 writethread(void *a) 275 { 276 char e[ERRMAX]; 277 uchar *buf; 278 int n; 279 Ioproc *io; 280 Req *r; 281 Client *c; 282 char tmp[32]; 283 284 c = a; 285 snprint(tmp, sizeof tmp, "write%d", c->num); 286 threadsetname(tmp); 287 288 buf = emalloc(8192); 289 io = c->writerproc; 290 for(;;){ 291 while(c->wq == nil){ 292 if(c->moribund) 293 goto Out; 294 recvp(c->writerkick); 295 if(c->moribund) 296 goto Out; 297 } 298 r = c->wq; 299 c->wq = r->aux; 300 c->curw = r; 301 n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count); 302 if(chatty9p) 303 fprint(2, "io->write returns %d\n", n); 304 if(c->curw == nil){ 305 closereq(r); 306 continue; 307 } 308 if(n >= 0){ 309 r->ofcall.count = n; 310 respond(r, nil); 311 }else{ 312 rerrstr(e, sizeof e); 313 respond(r, e); 314 } 315 } 316 Out: 317 free(buf); 318 die(c); 319 } 320 321 static void 322 execproc(void *a) 323 { 324 int i, fd; 325 Client *c; 326 char tmp[32]; 327 328 c = a; 329 snprint(tmp, sizeof tmp, "execproc%d", c->num); 330 threadsetname(tmp); 331 if(pipe(c->fd) < 0){ 332 rerrstr(c->err, sizeof c->err); 333 sendul(c->execpid, -1); 334 return; 335 } 336 rfork(RFFDG); 337 fd = c->fd[1]; 338 close(c->fd[0]); 339 dup(fd, 0); 340 dup(fd, 1); 341 for(i=3; i<100; i++) /* should do better */ 342 close(i); 343 strcpy(c->err, "exec failed"); 344 procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil); 345 } 346 347 static void 348 execthread(void *a) 349 { 350 Client *c; 351 int p; 352 char tmp[32]; 353 354 c = a; 355 snprint(tmp, sizeof tmp, "exec%d", c->num); 356 threadsetname(tmp); 357 c->execpid = chancreate(sizeof(ulong), 0); 358 proccreate(execproc, c, STACK); 359 p = recvul(c->execpid); 360 chanfree(c->execpid); 361 c->execpid = nil; 362 close(c->fd[1]); 363 c->fd[1] = c->fd[0]; 364 if(p != -1){ 365 c->pid = p; 366 c->activethread = 2; 367 threadcreate(readthread, c, STACK); 368 threadcreate(writethread, c, STACK); 369 if(c->execreq) 370 respond(c->execreq, nil); 371 }else{ 372 if(c->execreq) 373 respond(c->execreq, c->err); 374 } 375 } 376 377 void 378 ctlwrite(Req *r, Client *c) 379 { 380 char *f[3], *s, *p; 381 int nf; 382 383 s = emalloc(r->ifcall.count+1); 384 memmove(s, r->ifcall.data, r->ifcall.count); 385 s[r->ifcall.count] = '\0'; 386 387 f[0] = s; 388 p = strchr(s, ' '); 389 if(p == nil) 390 nf = 1; 391 else{ 392 *p++ = '\0'; 393 f[1] = p; 394 nf = 2; 395 } 396 397 if(f[0][0] == '\0'){ 398 free(s); 399 respond(r, nil); 400 return; 401 } 402 403 r->ofcall.count = r->ifcall.count; 404 if(strcmp(f[0], "hangup") == 0){ 405 if(c->pid == 0){ 406 respond(r, "connection already hung up"); 407 goto Out; 408 } 409 postnote(PNPROC, c->pid, "kill"); 410 respond(r, nil); 411 goto Out; 412 } 413 414 if(strcmp(f[0], "connect") == 0){ 415 if(c->cmd != nocmd){ 416 respond(r, "already have connection"); 417 goto Out; 418 } 419 if(nf == 1){ 420 respond(r, "need argument to connect"); 421 goto Out; 422 } 423 c->status = Exec; 424 if(p = strrchr(f[1], '!')) 425 *p = '\0'; 426 c->cmd = emalloc(4+1+strlen(f[1])+1); 427 strcpy(c->cmd, "exec "); 428 strcat(c->cmd, f[1]); 429 c->execreq = r; 430 threadcreate(execthread, c, STACK); 431 goto Out; 432 } 433 434 respond(r, "bad or inappropriate control message"); 435 Out: 436 free(s); 437 } 438