1 #include <u.h> 2 #include <libc.h> 3 #include <fcall.h> 4 #include <thread.h> 5 #include <9p.h> 6 #include "dat.h" 7 8 int nclient; 9 Client **client; 10 #define Zmsg ((Msg*)~0) 11 char nocmd[] = ""; 12 13 static void readthread(void*); 14 static void writethread(void*); 15 static void kickwriter(Client*); 16 17 int 18 newclient(void) 19 { 20 int i; 21 Client *c; 22 23 for(i=0; i<nclient; i++) 24 if(client[i]->ref==0 && !client[i]->moribund) 25 return i; 26 27 c = emalloc(sizeof(Client)); 28 c->writerkick = chancreate(sizeof(void*), 1); 29 c->execpid = chancreate(sizeof(ulong), 0); 30 c->cmd = nocmd; 31 32 c->readerproc = ioproc(); 33 c->writerproc = ioproc(); 34 c->num = nclient; 35 if(nclient%16 == 0) 36 client = erealloc(client, (nclient+16)*sizeof(client[0])); 37 client[nclient++] = c; 38 return nclient-1; 39 } 40 41 void 42 die(Client *c) 43 { 44 Msg *m, *next; 45 Req *r, *rnext; 46 47 c->moribund = 1; 48 kickwriter(c); 49 iointerrupt(c->readerproc); 50 iointerrupt(c->writerproc); 51 if(--c->activethread == 0){ 52 if(c->cmd != nocmd){ 53 free(c->cmd); 54 c->cmd = nocmd; 55 } 56 c->pid = 0; 57 c->moribund = 0; 58 c->status = Closed; 59 for(m=c->mq; m && m != Zmsg; m=next){ 60 next = m->link; 61 free(m); 62 } 63 c->mq = nil; 64 if(c->rq != nil){ 65 for(r=c->rq; r; r=rnext){ 66 rnext = r->aux; 67 respond(r, "hangup"); 68 } 69 c->rq = nil; 70 } 71 if(c->wq != nil){ 72 for(r=c->wq; r; r=rnext){ 73 rnext = r->aux; 74 respond(r, "hangup"); 75 } 76 c->wq = nil; 77 } 78 c->rq = nil; 79 c->wq = nil; 80 c->emq = nil; 81 c->erq = nil; 82 c->ewq = nil; 83 } 84 } 85 86 void 87 closeclient(Client *c) 88 { 89 if(--c->ref == 0){ 90 if(c->pid > 0) 91 postnote(PNPROC, c->pid, "kill"); 92 c->status = Hangup; 93 close(c->fd[0]); 94 c->fd[0] = c->fd[1] = -1; 95 c->moribund = 1; 96 kickwriter(c); 97 iointerrupt(c->readerproc); 98 iointerrupt(c->writerproc); 99 c->activethread++; 100 die(c); 101 } 102 } 103 104 void 105 queuerdreq(Client *c, Req *r) 106 { 107 if(c->rq==nil) 108 c->erq = &c->rq; 109 *c->erq = r; 110 r->aux = nil; 111 c->erq = (Req**)&r->aux; 112 } 113 114 void 115 queuewrreq(Client *c, Req *r) 116 { 117 if(c->wq==nil) 118 c->ewq = &c->wq; 119 *c->ewq = r; 120 r->aux = nil; 121 c->ewq = (Req**)&r->aux; 122 } 123 124 void 125 queuemsg(Client *c, Msg *m) 126 { 127 if(c->mq==nil) 128 c->emq = &c->mq; 129 *c->emq = m; 130 if(m != Zmsg){ 131 m->link = nil; 132 c->emq = (Msg**)&m->link; 133 }else 134 c->emq = nil; 135 } 136 137 void 138 matchmsgs(Client *c) 139 { 140 Req *r; 141 Msg *m; 142 int n, rm; 143 144 while(c->rq && c->mq){ 145 r = c->rq; 146 c->rq = r->aux; 147 148 rm = 0; 149 m = c->mq; 150 if(m == Zmsg){ 151 respond(r, "execnet: no more data"); 152 break; 153 } 154 n = r->ifcall.count; 155 if(n >= m->ep - m->rp){ 156 n = m->ep - m->rp; 157 c->mq = m->link; 158 rm = 1; 159 } 160 if(n) 161 memmove(r->ofcall.data, m->rp, n); 162 if(rm) 163 free(m); 164 else 165 m->rp += n; 166 r->ofcall.count = n; 167 respond(r, nil); 168 } 169 } 170 171 void 172 findrdreq(Client *c, Req *r) 173 { 174 Req **l; 175 176 for(l=&c->rq; *l; l=(Req**)&(*l)->aux){ 177 if(*l == r){ 178 *l = r->aux; 179 if(*l == nil) 180 c->erq = l; 181 respond(r, "flushed"); 182 break; 183 } 184 } 185 } 186 187 void 188 findwrreq(Client *c, Req *r) 189 { 190 Req **l; 191 192 for(l=&c->wq; *l; l=(Req**)&(*l)->aux){ 193 if(*l == r){ 194 *l = r->aux; 195 if(*l == nil) 196 c->ewq = l; 197 respond(r, "flushed"); 198 return; 199 } 200 } 201 } 202 203 void 204 dataread(Req *r, Client *c) 205 { 206 queuerdreq(c, r); 207 matchmsgs(c); 208 } 209 210 static void 211 readthread(void *a) 212 { 213 uchar *buf; 214 int n; 215 Client *c; 216 Ioproc *io; 217 Msg *m; 218 char tmp[32]; 219 220 c = a; 221 snprint(tmp, sizeof tmp, "read%d", c->num); 222 threadsetname(tmp); 223 224 buf = emalloc(8192); 225 io = c->readerproc; 226 while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){ 227 m = emalloc(sizeof(Msg)+n); 228 m->rp = (uchar*)&m[1]; 229 m->ep = m->rp + n; 230 if(n) 231 memmove(m->rp, buf, n); 232 queuemsg(c, m); 233 matchmsgs(c); 234 } 235 queuemsg(c, Zmsg); 236 free(buf); 237 die(c); 238 } 239 240 static void 241 kickwriter(Client *c) 242 { 243 nbsendp(c->writerkick, nil); 244 } 245 246 void 247 clientflush(Req *or, Client *c) 248 { 249 if(or->ifcall.type == Tread) 250 findrdreq(c, or); 251 else{ 252 if(c->execreq == or){ 253 c->execreq = nil; 254 iointerrupt(c->writerproc); 255 } 256 findwrreq(c, or); 257 if(c->curw == or){ 258 c->curw = nil; 259 iointerrupt(c->writerproc); 260 kickwriter(c); 261 } 262 } 263 } 264 265 void 266 datawrite(Req *r, Client *c) 267 { 268 queuewrreq(c, r); 269 kickwriter(c); 270 } 271 272 static void 273 writethread(void *a) 274 { 275 char e[ERRMAX]; 276 uchar *buf; 277 int n; 278 Ioproc *io; 279 Req *r; 280 Client *c; 281 char tmp[32]; 282 283 c = a; 284 snprint(tmp, sizeof tmp, "write%d", c->num); 285 threadsetname(tmp); 286 287 buf = emalloc(8192); 288 io = c->writerproc; 289 for(;;){ 290 while(c->wq == nil){ 291 if(c->moribund) 292 goto Out; 293 recvp(c->writerkick); 294 if(c->moribund) 295 goto Out; 296 } 297 r = c->wq; 298 c->wq = r->aux; 299 c->curw = r; 300 n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count); 301 if(chatty9p) 302 fprint(2, "io->write returns %d\n", n); 303 if(n >= 0){ 304 r->ofcall.count = n; 305 respond(r, nil); 306 }else{ 307 rerrstr(e, sizeof e); 308 respond(r, e); 309 } 310 } 311 Out: 312 free(buf); 313 die(c); 314 } 315 316 static void 317 execproc(void *a) 318 { 319 int i, fd; 320 Client *c; 321 char tmp[32]; 322 323 c = a; 324 snprint(tmp, sizeof tmp, "execproc%d", c->num); 325 threadsetname(tmp); 326 if(pipe(c->fd) < 0){ 327 rerrstr(c->err, sizeof c->err); 328 sendul(c->execpid, -1); 329 return; 330 } 331 rfork(RFFDG); 332 fd = c->fd[1]; 333 close(c->fd[0]); 334 dup(fd, 0); 335 dup(fd, 1); 336 for(i=3; i<100; i++) /* should do better */ 337 close(i); 338 strcpy(c->err, "exec failed"); 339 procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil); 340 } 341 342 static void 343 execthread(void *a) 344 { 345 Client *c; 346 int p; 347 char tmp[32]; 348 349 c = a; 350 snprint(tmp, sizeof tmp, "exec%d", c->num); 351 threadsetname(tmp); 352 c->execpid = chancreate(sizeof(ulong), 0); 353 proccreate(execproc, c, STACK); 354 p = recvul(c->execpid); 355 chanfree(c->execpid); 356 c->execpid = nil; 357 close(c->fd[1]); 358 c->fd[1] = c->fd[0]; 359 if(p != -1){ 360 c->pid = p; 361 c->activethread = 2; 362 threadcreate(readthread, c, STACK); 363 threadcreate(writethread, c, STACK); 364 if(c->execreq) 365 respond(c->execreq, nil); 366 }else{ 367 if(c->execreq) 368 respond(c->execreq, c->err); 369 } 370 } 371 372 void 373 ctlwrite(Req *r, Client *c) 374 { 375 char *f[3], *s, *p; 376 int nf; 377 378 s = emalloc(r->ifcall.count+1); 379 memmove(s, r->ifcall.data, r->ifcall.count); 380 s[r->ifcall.count] = '\0'; 381 382 f[0] = s; 383 p = strchr(s, ' '); 384 if(p == nil) 385 nf = 1; 386 else{ 387 *p++ = '\0'; 388 f[1] = p; 389 nf = 2; 390 } 391 392 if(f[0][0] == '\0'){ 393 free(s); 394 respond(r, nil); 395 return; 396 } 397 398 r->ofcall.count = r->ifcall.count; 399 if(strcmp(f[0], "hangup") == 0){ 400 if(c->pid == 0){ 401 respond(r, "connection already hung up"); 402 goto Out; 403 } 404 postnote(PNPROC, c->pid, "kill"); 405 respond(r, nil); 406 goto Out; 407 } 408 409 if(strcmp(f[0], "connect") == 0){ 410 if(c->cmd != nocmd){ 411 respond(r, "already have connection"); 412 goto Out; 413 } 414 if(nf == 1){ 415 respond(r, "need argument to connect"); 416 goto Out; 417 } 418 c->status = Exec; 419 if(p = strrchr(f[1], '!')) 420 *p = '\0'; 421 c->cmd = emalloc(4+1+strlen(f[1])+1); 422 strcpy(c->cmd, "exec "); 423 strcat(c->cmd, f[1]); 424 c->execreq = r; 425 threadcreate(execthread, c, STACK); 426 goto Out; 427 } 428 429 respond(r, "bad or inappropriate control message"); 430 Out: 431 free(s); 432 } 433