xref: /plan9/sys/src/cmd/execnet/client.c (revision 6b6b9ac8b0b103b1e30e4d019522a78c950fce74)
1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <9p.h>
6 #include "dat.h"
7 
8 int nclient;
9 Client **client;
10 #define Zmsg ((Msg*)~0)
11 char nocmd[] = "";
12 
13 static void readthread(void*);
14 static void writethread(void*);
15 static void kickwriter(Client*);
16 
17 int
newclient(void)18 newclient(void)
19 {
20 	int i;
21 	Client *c;
22 
23 	for(i=0; i<nclient; i++)
24 		if(client[i]->ref==0 && !client[i]->moribund)
25 			return i;
26 
27 	c = emalloc(sizeof(Client));
28 	c->writerkick = chancreate(sizeof(void*), 1);
29 	c->execpid = chancreate(sizeof(ulong), 0);
30 	c->cmd = nocmd;
31 
32 	c->readerproc = ioproc();
33 	c->writerproc = ioproc();
34 	c->num = nclient;
35 	if(nclient%16 == 0)
36 		client = erealloc(client, (nclient+16)*sizeof(client[0]));
37 	client[nclient++] = c;
38 	return nclient-1;
39 }
40 
41 void
die(Client * c)42 die(Client *c)
43 {
44 	Msg *m, *next;
45 	Req *r, *rnext;
46 
47 	c->moribund = 1;
48 	kickwriter(c);
49 	iointerrupt(c->readerproc);
50 	iointerrupt(c->writerproc);
51 	if(--c->activethread == 0){
52 		if(c->cmd != nocmd){
53 			free(c->cmd);
54 			c->cmd = nocmd;
55 		}
56 		c->pid = 0;
57 		c->moribund = 0;
58 		c->status = Closed;
59 		for(m=c->mq; m && m != Zmsg; m=next){
60 			next = m->link;
61 			free(m);
62 		}
63 		c->mq = nil;
64 		if(c->rq != nil){
65 			for(r=c->rq; r; r=rnext){
66 				rnext = r->aux;
67 				respond(r, "hangup");
68 			}
69 			c->rq = nil;
70 		}
71 		if(c->wq != nil){
72 			for(r=c->wq; r; r=rnext){
73 				rnext = r->aux;
74 				respond(r, "hangup");
75 			}
76 			c->wq = nil;
77 		}
78 		c->rq = nil;
79 		c->wq = nil;
80 		c->emq = nil;
81 		c->erq = nil;
82 		c->ewq = nil;
83 	}
84 }
85 
86 void
closeclient(Client * c)87 closeclient(Client *c)
88 {
89 	if(--c->ref == 0){
90 		if(c->pid > 0)
91 			postnote(PNPROC, c->pid, "kill");
92 		c->status = Hangup;
93 		close(c->fd[0]);
94 		c->fd[0] = c->fd[1] = -1;
95 		c->moribund = 1;
96 		kickwriter(c);
97 		iointerrupt(c->readerproc);
98 		iointerrupt(c->writerproc);
99 		c->activethread++;
100 		die(c);
101 	}
102 }
103 
104 void
queuerdreq(Client * c,Req * r)105 queuerdreq(Client *c, Req *r)
106 {
107 	if(c->rq==nil)
108 		c->erq = &c->rq;
109 	*c->erq = r;
110 	r->aux = nil;
111 	c->erq = (Req**)&r->aux;
112 }
113 
114 void
queuewrreq(Client * c,Req * r)115 queuewrreq(Client *c, Req *r)
116 {
117 	if(c->wq==nil)
118 		c->ewq = &c->wq;
119 	*c->ewq = r;
120 	r->aux = nil;
121 	c->ewq = (Req**)&r->aux;
122 }
123 
124 void
queuemsg(Client * c,Msg * m)125 queuemsg(Client *c, Msg *m)
126 {
127 	if(c->mq==nil)
128 		c->emq = &c->mq;
129 	*c->emq = m;
130 	if(m != Zmsg){
131 		m->link = nil;
132 		c->emq = (Msg**)&m->link;
133 	}else
134 		c->emq = nil;
135 }
136 
137 void
matchmsgs(Client * c)138 matchmsgs(Client *c)
139 {
140 	Req *r;
141 	Msg *m;
142 	int n, rm;
143 
144 	while(c->rq && c->mq){
145 		r = c->rq;
146 		c->rq = r->aux;
147 
148 		rm = 0;
149 		m = c->mq;
150 		if(m == Zmsg){
151 			respond(r, "execnet: no more data");
152 			break;
153 		}
154 		n = r->ifcall.count;
155 		if(n >= m->ep - m->rp){
156 			n = m->ep - m->rp;
157 			c->mq = m->link;
158 			rm = 1;
159 		}
160 		if(n)
161 			memmove(r->ofcall.data, m->rp, n);
162 		if(rm)
163 			free(m);
164 		else
165 			m->rp += n;
166 		r->ofcall.count = n;
167 		respond(r, nil);
168 	}
169 }
170 
171 void
findrdreq(Client * c,Req * r)172 findrdreq(Client *c, Req *r)
173 {
174 	Req **l;
175 
176 	for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
177 		if(*l == r){
178 			*l = r->aux;
179 			if(*l == nil)
180 				c->erq = l;
181 			respond(r, "flushed");
182 			break;
183 		}
184 	}
185 }
186 
187 void
findwrreq(Client * c,Req * r)188 findwrreq(Client *c, Req *r)
189 {
190 	Req **l;
191 
192 	for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
193 		if(*l == r){
194 			*l = r->aux;
195 			if(*l == nil)
196 				c->ewq = l;
197 			respond(r, "flushed");
198 			return;
199 		}
200 	}
201 }
202 
203 void
dataread(Req * r,Client * c)204 dataread(Req *r, Client *c)
205 {
206 	queuerdreq(c, r);
207 	matchmsgs(c);
208 }
209 
210 static void
readthread(void * a)211 readthread(void *a)
212 {
213 	uchar *buf;
214 	int n;
215 	Client *c;
216 	Ioproc *io;
217 	Msg *m;
218 	char tmp[32];
219 
220 	c = a;
221 	snprint(tmp, sizeof tmp, "read%d", c->num);
222 	threadsetname(tmp);
223 
224 	buf = emalloc(8192);
225 	io = c->readerproc;
226 	while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
227 		m = emalloc(sizeof(Msg)+n);
228 		m->rp = (uchar*)&m[1];
229 		m->ep = m->rp + n;
230 		if(n)
231 			memmove(m->rp, buf, n);
232 		queuemsg(c, m);
233 		matchmsgs(c);
234 	}
235 	queuemsg(c, Zmsg);
236 	free(buf);
237 	die(c);
238 }
239 
240 static void
kickwriter(Client * c)241 kickwriter(Client *c)
242 {
243 	nbsendp(c->writerkick, nil);
244 }
245 
246 void
clientflush(Req * or,Client * c)247 clientflush(Req *or, Client *c)
248 {
249 	if(or->ifcall.type == Tread)
250 		findrdreq(c, or);
251 	else{
252 		if(c->execreq == or){
253 			c->execreq = nil;
254 			iointerrupt(c->writerproc);
255 		}
256 		findwrreq(c, or);
257 		if(c->curw == or){
258 			c->curw = nil;
259 			iointerrupt(c->writerproc);
260 			kickwriter(c);
261 		}
262 	}
263 }
264 
265 void
datawrite(Req * r,Client * c)266 datawrite(Req *r, Client *c)
267 {
268 	queuewrreq(c, r);
269 	kickwriter(c);
270 }
271 
272 static void
writethread(void * a)273 writethread(void *a)
274 {
275 	char e[ERRMAX];
276 	uchar *buf;
277 	int n;
278 	Ioproc *io;
279 	Req *r;
280 	Client *c;
281 	char tmp[32];
282 
283 	c = a;
284 	snprint(tmp, sizeof tmp, "write%d", c->num);
285 	threadsetname(tmp);
286 
287 	buf = emalloc(8192);
288 	io = c->writerproc;
289 	for(;;){
290 		while(c->wq == nil){
291 			if(c->moribund)
292 				goto Out;
293 			recvp(c->writerkick);
294 			if(c->moribund)
295 				goto Out;
296 		}
297 		r = c->wq;
298 		c->wq = r->aux;
299 		c->curw = r;
300 		n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
301 		if(chatty9p)
302 			fprint(2, "io->write returns %d\n", n);
303 		if(n >= 0){
304 			r->ofcall.count = n;
305 			respond(r, nil);
306 		}else{
307 			rerrstr(e, sizeof e);
308 			respond(r, e);
309 		}
310 	}
311 Out:
312 	free(buf);
313 	die(c);
314 }
315 
316 static void
execproc(void * a)317 execproc(void *a)
318 {
319 	int i, fd;
320 	Client *c;
321 	char tmp[32];
322 
323 	c = a;
324 	snprint(tmp, sizeof tmp, "execproc%d", c->num);
325 	threadsetname(tmp);
326 	if(pipe(c->fd) < 0){
327 		rerrstr(c->err, sizeof c->err);
328 		sendul(c->execpid, -1);
329 		return;
330 	}
331 	rfork(RFFDG);
332 	fd = c->fd[1];
333 	close(c->fd[0]);
334 	dup(fd, 0);
335 	dup(fd, 1);
336 	for(i=3; i<100; i++)	/* should do better */
337 		close(i);
338 	strcpy(c->err, "exec failed");
339 	procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
340 }
341 
342 static void
execthread(void * a)343 execthread(void *a)
344 {
345 	Client *c;
346 	int p;
347 	char tmp[32];
348 
349 	c = a;
350 	snprint(tmp, sizeof tmp, "exec%d", c->num);
351 	threadsetname(tmp);
352 	c->execpid = chancreate(sizeof(ulong), 0);
353 	proccreate(execproc, c, STACK);
354 	p = recvul(c->execpid);
355 	chanfree(c->execpid);
356 	c->execpid = nil;
357 	close(c->fd[1]);
358 	c->fd[1] = c->fd[0];
359 	if(p != -1){
360 		c->pid = p;
361 		c->activethread = 2;
362 		threadcreate(readthread, c, STACK);
363 		threadcreate(writethread, c, STACK);
364 		if(c->execreq)
365 			respond(c->execreq, nil);
366 	}else{
367 		if(c->execreq)
368 			respond(c->execreq, c->err);
369 	}
370 }
371 
372 void
ctlwrite(Req * r,Client * c)373 ctlwrite(Req *r, Client *c)
374 {
375 	char *f[3], *s, *p;
376 	int nf;
377 
378 	s = emalloc(r->ifcall.count+1);
379 	memmove(s, r->ifcall.data, r->ifcall.count);
380 	s[r->ifcall.count] = '\0';
381 
382 	f[0] = s;
383 	p = strchr(s, ' ');
384 	if(p == nil)
385 		nf = 1;
386 	else{
387 		*p++ = '\0';
388 		f[1] = p;
389 		nf = 2;
390 	}
391 
392 	if(f[0][0] == '\0'){
393 		free(s);
394 		respond(r, nil);
395 		return;
396 	}
397 
398 	r->ofcall.count = r->ifcall.count;
399 	if(strcmp(f[0], "hangup") == 0){
400 		if(c->pid == 0){
401 			respond(r, "connection already hung up");
402 			goto Out;
403 		}
404 		postnote(PNPROC, c->pid, "kill");
405 		respond(r, nil);
406 		goto Out;
407 	}
408 
409 	if(strcmp(f[0], "connect") == 0){
410 		if(c->cmd != nocmd){
411 			respond(r, "already have connection");
412 			goto Out;
413 		}
414 		if(nf == 1){
415 			respond(r, "need argument to connect");
416 			goto Out;
417 		}
418 		c->status = Exec;
419 		if(p = strrchr(f[1], '!'))
420 			*p = '\0';
421 		c->cmd = emalloc(4+1+strlen(f[1])+1);
422 		strcpy(c->cmd, "exec ");
423 		strcat(c->cmd, f[1]);
424 		c->execreq = r;
425 		threadcreate(execthread, c, STACK);
426 		goto Out;
427 	}
428 
429 	respond(r, "bad or inappropriate control message");
430 Out:
431 	free(s);
432 }
433