1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <9p.h>
6 #include "dat.h"
7
8 int nclient;
9 Client **client;
10 #define Zmsg ((Msg*)~0)
11 char nocmd[] = "";
12
13 static void readthread(void*);
14 static void writethread(void*);
15 static void kickwriter(Client*);
16
17 int
newclient(void)18 newclient(void)
19 {
20 int i;
21 Client *c;
22
23 for(i=0; i<nclient; i++)
24 if(client[i]->ref==0 && !client[i]->moribund)
25 return i;
26
27 c = emalloc(sizeof(Client));
28 c->writerkick = chancreate(sizeof(void*), 1);
29 c->execpid = chancreate(sizeof(ulong), 0);
30 c->cmd = nocmd;
31
32 c->readerproc = ioproc();
33 c->writerproc = ioproc();
34 c->num = nclient;
35 if(nclient%16 == 0)
36 client = erealloc(client, (nclient+16)*sizeof(client[0]));
37 client[nclient++] = c;
38 return nclient-1;
39 }
40
41 void
die(Client * c)42 die(Client *c)
43 {
44 Msg *m, *next;
45 Req *r, *rnext;
46
47 c->moribund = 1;
48 kickwriter(c);
49 iointerrupt(c->readerproc);
50 iointerrupt(c->writerproc);
51 if(--c->activethread == 0){
52 if(c->cmd != nocmd){
53 free(c->cmd);
54 c->cmd = nocmd;
55 }
56 c->pid = 0;
57 c->moribund = 0;
58 c->status = Closed;
59 for(m=c->mq; m && m != Zmsg; m=next){
60 next = m->link;
61 free(m);
62 }
63 c->mq = nil;
64 if(c->rq != nil){
65 for(r=c->rq; r; r=rnext){
66 rnext = r->aux;
67 respond(r, "hangup");
68 }
69 c->rq = nil;
70 }
71 if(c->wq != nil){
72 for(r=c->wq; r; r=rnext){
73 rnext = r->aux;
74 respond(r, "hangup");
75 }
76 c->wq = nil;
77 }
78 c->rq = nil;
79 c->wq = nil;
80 c->emq = nil;
81 c->erq = nil;
82 c->ewq = nil;
83 }
84 }
85
86 void
closeclient(Client * c)87 closeclient(Client *c)
88 {
89 if(--c->ref == 0){
90 if(c->pid > 0)
91 postnote(PNPROC, c->pid, "kill");
92 c->status = Hangup;
93 close(c->fd[0]);
94 c->fd[0] = c->fd[1] = -1;
95 c->moribund = 1;
96 kickwriter(c);
97 iointerrupt(c->readerproc);
98 iointerrupt(c->writerproc);
99 c->activethread++;
100 die(c);
101 }
102 }
103
104 void
queuerdreq(Client * c,Req * r)105 queuerdreq(Client *c, Req *r)
106 {
107 if(c->rq==nil)
108 c->erq = &c->rq;
109 *c->erq = r;
110 r->aux = nil;
111 c->erq = (Req**)&r->aux;
112 }
113
114 void
queuewrreq(Client * c,Req * r)115 queuewrreq(Client *c, Req *r)
116 {
117 if(c->wq==nil)
118 c->ewq = &c->wq;
119 *c->ewq = r;
120 r->aux = nil;
121 c->ewq = (Req**)&r->aux;
122 }
123
124 void
queuemsg(Client * c,Msg * m)125 queuemsg(Client *c, Msg *m)
126 {
127 if(c->mq==nil)
128 c->emq = &c->mq;
129 *c->emq = m;
130 if(m != Zmsg){
131 m->link = nil;
132 c->emq = (Msg**)&m->link;
133 }else
134 c->emq = nil;
135 }
136
137 void
matchmsgs(Client * c)138 matchmsgs(Client *c)
139 {
140 Req *r;
141 Msg *m;
142 int n, rm;
143
144 while(c->rq && c->mq){
145 r = c->rq;
146 c->rq = r->aux;
147
148 rm = 0;
149 m = c->mq;
150 if(m == Zmsg){
151 respond(r, "execnet: no more data");
152 break;
153 }
154 n = r->ifcall.count;
155 if(n >= m->ep - m->rp){
156 n = m->ep - m->rp;
157 c->mq = m->link;
158 rm = 1;
159 }
160 if(n)
161 memmove(r->ofcall.data, m->rp, n);
162 if(rm)
163 free(m);
164 else
165 m->rp += n;
166 r->ofcall.count = n;
167 respond(r, nil);
168 }
169 }
170
171 void
findrdreq(Client * c,Req * r)172 findrdreq(Client *c, Req *r)
173 {
174 Req **l;
175
176 for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
177 if(*l == r){
178 *l = r->aux;
179 if(*l == nil)
180 c->erq = l;
181 respond(r, "flushed");
182 break;
183 }
184 }
185 }
186
187 void
findwrreq(Client * c,Req * r)188 findwrreq(Client *c, Req *r)
189 {
190 Req **l;
191
192 for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
193 if(*l == r){
194 *l = r->aux;
195 if(*l == nil)
196 c->ewq = l;
197 respond(r, "flushed");
198 return;
199 }
200 }
201 }
202
203 void
dataread(Req * r,Client * c)204 dataread(Req *r, Client *c)
205 {
206 queuerdreq(c, r);
207 matchmsgs(c);
208 }
209
210 static void
readthread(void * a)211 readthread(void *a)
212 {
213 uchar *buf;
214 int n;
215 Client *c;
216 Ioproc *io;
217 Msg *m;
218 char tmp[32];
219
220 c = a;
221 snprint(tmp, sizeof tmp, "read%d", c->num);
222 threadsetname(tmp);
223
224 buf = emalloc(8192);
225 io = c->readerproc;
226 while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
227 m = emalloc(sizeof(Msg)+n);
228 m->rp = (uchar*)&m[1];
229 m->ep = m->rp + n;
230 if(n)
231 memmove(m->rp, buf, n);
232 queuemsg(c, m);
233 matchmsgs(c);
234 }
235 queuemsg(c, Zmsg);
236 free(buf);
237 die(c);
238 }
239
240 static void
kickwriter(Client * c)241 kickwriter(Client *c)
242 {
243 nbsendp(c->writerkick, nil);
244 }
245
246 void
clientflush(Req * or,Client * c)247 clientflush(Req *or, Client *c)
248 {
249 if(or->ifcall.type == Tread)
250 findrdreq(c, or);
251 else{
252 if(c->execreq == or){
253 c->execreq = nil;
254 iointerrupt(c->writerproc);
255 }
256 findwrreq(c, or);
257 if(c->curw == or){
258 c->curw = nil;
259 iointerrupt(c->writerproc);
260 kickwriter(c);
261 }
262 }
263 }
264
265 void
datawrite(Req * r,Client * c)266 datawrite(Req *r, Client *c)
267 {
268 queuewrreq(c, r);
269 kickwriter(c);
270 }
271
272 static void
writethread(void * a)273 writethread(void *a)
274 {
275 char e[ERRMAX];
276 uchar *buf;
277 int n;
278 Ioproc *io;
279 Req *r;
280 Client *c;
281 char tmp[32];
282
283 c = a;
284 snprint(tmp, sizeof tmp, "write%d", c->num);
285 threadsetname(tmp);
286
287 buf = emalloc(8192);
288 io = c->writerproc;
289 for(;;){
290 while(c->wq == nil){
291 if(c->moribund)
292 goto Out;
293 recvp(c->writerkick);
294 if(c->moribund)
295 goto Out;
296 }
297 r = c->wq;
298 c->wq = r->aux;
299 c->curw = r;
300 n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
301 if(chatty9p)
302 fprint(2, "io->write returns %d\n", n);
303 if(n >= 0){
304 r->ofcall.count = n;
305 respond(r, nil);
306 }else{
307 rerrstr(e, sizeof e);
308 respond(r, e);
309 }
310 }
311 Out:
312 free(buf);
313 die(c);
314 }
315
316 static void
execproc(void * a)317 execproc(void *a)
318 {
319 int i, fd;
320 Client *c;
321 char tmp[32];
322
323 c = a;
324 snprint(tmp, sizeof tmp, "execproc%d", c->num);
325 threadsetname(tmp);
326 if(pipe(c->fd) < 0){
327 rerrstr(c->err, sizeof c->err);
328 sendul(c->execpid, -1);
329 return;
330 }
331 rfork(RFFDG);
332 fd = c->fd[1];
333 close(c->fd[0]);
334 dup(fd, 0);
335 dup(fd, 1);
336 for(i=3; i<100; i++) /* should do better */
337 close(i);
338 strcpy(c->err, "exec failed");
339 procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
340 }
341
342 static void
execthread(void * a)343 execthread(void *a)
344 {
345 Client *c;
346 int p;
347 char tmp[32];
348
349 c = a;
350 snprint(tmp, sizeof tmp, "exec%d", c->num);
351 threadsetname(tmp);
352 c->execpid = chancreate(sizeof(ulong), 0);
353 proccreate(execproc, c, STACK);
354 p = recvul(c->execpid);
355 chanfree(c->execpid);
356 c->execpid = nil;
357 close(c->fd[1]);
358 c->fd[1] = c->fd[0];
359 if(p != -1){
360 c->pid = p;
361 c->activethread = 2;
362 threadcreate(readthread, c, STACK);
363 threadcreate(writethread, c, STACK);
364 if(c->execreq)
365 respond(c->execreq, nil);
366 }else{
367 if(c->execreq)
368 respond(c->execreq, c->err);
369 }
370 }
371
372 void
ctlwrite(Req * r,Client * c)373 ctlwrite(Req *r, Client *c)
374 {
375 char *f[3], *s, *p;
376 int nf;
377
378 s = emalloc(r->ifcall.count+1);
379 memmove(s, r->ifcall.data, r->ifcall.count);
380 s[r->ifcall.count] = '\0';
381
382 f[0] = s;
383 p = strchr(s, ' ');
384 if(p == nil)
385 nf = 1;
386 else{
387 *p++ = '\0';
388 f[1] = p;
389 nf = 2;
390 }
391
392 if(f[0][0] == '\0'){
393 free(s);
394 respond(r, nil);
395 return;
396 }
397
398 r->ofcall.count = r->ifcall.count;
399 if(strcmp(f[0], "hangup") == 0){
400 if(c->pid == 0){
401 respond(r, "connection already hung up");
402 goto Out;
403 }
404 postnote(PNPROC, c->pid, "kill");
405 respond(r, nil);
406 goto Out;
407 }
408
409 if(strcmp(f[0], "connect") == 0){
410 if(c->cmd != nocmd){
411 respond(r, "already have connection");
412 goto Out;
413 }
414 if(nf == 1){
415 respond(r, "need argument to connect");
416 goto Out;
417 }
418 c->status = Exec;
419 if(p = strrchr(f[1], '!'))
420 *p = '\0';
421 c->cmd = emalloc(4+1+strlen(f[1])+1);
422 strcpy(c->cmd, "exec ");
423 strcat(c->cmd, f[1]);
424 c->execreq = r;
425 threadcreate(execthread, c, STACK);
426 goto Out;
427 }
428
429 respond(r, "bad or inappropriate control message");
430 Out:
431 free(s);
432 }
433