xref: /plan9-contrib/sys/src/cmd/execnet/client.c (revision 6a9fc400c33447ef5e1cda7185cb4de2c8e8010e)
1 #include <u.h>
2 #include <libc.h>
3 #include <fcall.h>
4 #include <thread.h>
5 #include <9p.h>
6 #include "dat.h"
7 
8 int nclient;
9 Client **client;
10 #define Zmsg ((Msg*)~0)
11 char nocmd[] = "";
12 
13 static void readthread(void*);
14 static void writethread(void*);
15 static void kickwriter(Client*);
16 
17 int
18 newclient(void)
19 {
20 	int i;
21 	Client *c;
22 
23 	for(i=0; i<nclient; i++)
24 		if(client[i]->ref==0 && !client[i]->moribund)
25 			return i;
26 
27 	c = emalloc(sizeof(Client));
28 	c->writerkick = chancreate(sizeof(void*), 1);
29 	c->execpid = chancreate(sizeof(ulong), 0);
30 	c->cmd = nocmd;
31 
32 	c->readerproc = ioproc();
33 	c->writerproc = ioproc();
34 	c->num = nclient;
35 	if(nclient%16 == 0)
36 		client = erealloc(client, (nclient+16)*sizeof(client[0]));
37 	client[nclient++] = c;
38 	return nclient-1;
39 }
40 
41 void
42 die(Client *c)
43 {
44 	Msg *m, *next;
45 	Req *r, *rnext;
46 
47 	c->moribund = 1;
48 	kickwriter(c);
49 	iointerrupt(c->readerproc);
50 	iointerrupt(c->writerproc);
51 	if(--c->activethread == 0){
52 		if(c->cmd != nocmd){
53 			free(c->cmd);
54 			c->cmd = nocmd;
55 		}
56 		c->pid = 0;
57 		c->moribund = 0;
58 		c->status = Closed;
59 		for(m=c->mq; m && m != Zmsg; m=next){
60 			next = m->link;
61 			free(m);
62 		}
63 		c->mq = nil;
64 		if(c->rq != nil){
65 			for(r=c->rq; r; r=rnext){
66 				rnext = r->aux;
67 				respond(r, "hangup");
68 			}
69 			c->rq = nil;
70 		}
71 		if(c->wq != nil){
72 			for(r=c->wq; r; r=rnext){
73 				rnext = r->aux;
74 				respond(r, "hangup");
75 			}
76 			c->wq = nil;
77 		}
78 		c->rq = nil;
79 		c->wq = nil;
80 		c->emq = nil;
81 		c->erq = nil;
82 		c->ewq = nil;
83 	}
84 }
85 
86 void
87 closeclient(Client *c)
88 {
89 	if(--c->ref == 0){
90 		if(c->pid > 0)
91 			postnote(PNPROC, c->pid, "kill");
92 		c->status = Hangup;
93 		close(c->fd[0]);
94 		c->fd[0] = c->fd[1] = -1;
95 		c->moribund = 1;
96 		kickwriter(c);
97 		iointerrupt(c->readerproc);
98 		iointerrupt(c->writerproc);
99 		c->activethread++;
100 		die(c);
101 	}
102 }
103 
104 void
105 queuerdreq(Client *c, Req *r)
106 {
107 	if(c->rq==nil)
108 		c->erq = &c->rq;
109 	*c->erq = r;
110 	r->aux = nil;
111 	c->erq = (Req**)&r->aux;
112 }
113 
114 void
115 queuewrreq(Client *c, Req *r)
116 {
117 	if(c->wq==nil)
118 		c->ewq = &c->wq;
119 	*c->ewq = r;
120 	r->aux = nil;
121 	c->ewq = (Req**)&r->aux;
122 }
123 
124 void
125 queuemsg(Client *c, Msg *m)
126 {
127 	if(c->mq==nil)
128 		c->emq = &c->mq;
129 	*c->emq = m;
130 	if(m != Zmsg){
131 		m->link = nil;
132 		c->emq = (Msg**)&m->link;
133 	}else
134 		c->emq = nil;
135 }
136 
137 void
138 matchmsgs(Client *c)
139 {
140 	Req *r;
141 	Msg *m;
142 	int n, rm;
143 
144 	while(c->rq && c->mq){
145 		r = c->rq;
146 		c->rq = r->aux;
147 
148 		rm = 0;
149 		m = c->mq;
150 		if(m == Zmsg){
151 			respond(r, "execnet: no more data");
152 			break;
153 		}
154 		n = r->ifcall.count;
155 		if(n >= m->ep - m->rp){
156 			n = m->ep - m->rp;
157 			c->mq = m->link;
158 			rm = 1;
159 		}
160 		if(n)
161 			memmove(r->ofcall.data, m->rp, n);
162 		if(rm)
163 			free(m);
164 		else
165 			m->rp += n;
166 		r->ofcall.count = n;
167 		respond(r, nil);
168 	}
169 }
170 
171 Req*
172 findrdreq(Client *c, Req *r)
173 {
174 	Req **l;
175 
176 	for(l=&c->rq; *l; l=(Req**)&(*l)->aux){
177 		if(*l == r){
178 			*l = r->aux;
179 			if(*l == nil)
180 				c->erq = l;
181 			return r;
182 		}
183 	}
184 	return nil;
185 }
186 
187 Req*
188 findwrreq(Client *c, Req *r)
189 {
190 	Req **l;
191 
192 	for(l=&c->wq; *l; l=(Req**)&(*l)->aux){
193 		if(*l == r){
194 			*l = r->aux;
195 			if(*l == nil)
196 				c->ewq = l;
197 			return r;
198 		}
199 	}
200 	return nil;
201 }
202 
203 void
204 dataread(Req *r, Client *c)
205 {
206 	queuerdreq(c, r);
207 	matchmsgs(c);
208 }
209 
210 static void
211 readthread(void *a)
212 {
213 	uchar *buf;
214 	int n;
215 	Client *c;
216 	Ioproc *io;
217 	Msg *m;
218 	char tmp[32];
219 
220 	c = a;
221 	snprint(tmp, sizeof tmp, "read%d", c->num);
222 	threadsetname(tmp);
223 
224 	buf = emalloc(8192);
225 	io = c->readerproc;
226 	while((n = ioread(io, c->fd[0], buf, 8192)) >= 0){
227 		m = emalloc(sizeof(Msg)+n);
228 		m->rp = (uchar*)&m[1];
229 		m->ep = m->rp + n;
230 		if(n)
231 			memmove(m->rp, buf, n);
232 		queuemsg(c, m);
233 		matchmsgs(c);
234 	}
235 	queuemsg(c, Zmsg);
236 	free(buf);
237 	die(c);
238 }
239 
240 static void
241 kickwriter(Client *c)
242 {
243 	nbsendp(c->writerkick, nil);
244 }
245 
246 void
247 clientflush(Req *or, Client *c)
248 {
249 	/* BUG: this leaks Req structures; we should closereq sometimes */
250 	if(or->ifcall.type == Tread)
251 		findrdreq(c, or);
252 	else{
253 		if(c->execreq == or){
254 			c->execreq = nil;
255 			iointerrupt(c->writerproc);
256 		}
257 		findwrreq(c, or);
258 		if(c->curw == or){
259 			c->curw = nil;
260 			iointerrupt(c->writerproc);
261 			kickwriter(c);
262 		}
263 	}
264 }
265 
266 void
267 datawrite(Req *r, Client *c)
268 {
269 	queuewrreq(c, r);
270 	kickwriter(c);
271 }
272 
273 static void
274 writethread(void *a)
275 {
276 	char e[ERRMAX];
277 	uchar *buf;
278 	int n;
279 	Ioproc *io;
280 	Req *r;
281 	Client *c;
282 	char tmp[32];
283 
284 	c = a;
285 	snprint(tmp, sizeof tmp, "write%d", c->num);
286 	threadsetname(tmp);
287 
288 	buf = emalloc(8192);
289 	io = c->writerproc;
290 	for(;;){
291 		while(c->wq == nil){
292 			if(c->moribund)
293 				goto Out;
294 			recvp(c->writerkick);
295 			if(c->moribund)
296 				goto Out;
297 		}
298 		r = c->wq;
299 		c->wq = r->aux;
300 		c->curw = r;
301 		n = iowrite(io, c->fd[1], r->ifcall.data, r->ifcall.count);
302 		if(chatty9p)
303 			fprint(2, "io->write returns %d\n", n);
304 		if(c->curw == nil){
305 			closereq(r);
306 			continue;
307 		}
308 		if(n >= 0){
309 			r->ofcall.count = n;
310 			respond(r, nil);
311 		}else{
312 			rerrstr(e, sizeof e);
313 			respond(r, e);
314 		}
315 	}
316 Out:
317 	free(buf);
318 	die(c);
319 }
320 
321 static void
322 execproc(void *a)
323 {
324 	int i, fd;
325 	Client *c;
326 	char tmp[32];
327 
328 	c = a;
329 	snprint(tmp, sizeof tmp, "execproc%d", c->num);
330 	threadsetname(tmp);
331 	if(pipe(c->fd) < 0){
332 		rerrstr(c->err, sizeof c->err);
333 		sendul(c->execpid, -1);
334 		return;
335 	}
336 	rfork(RFFDG);
337 	fd = c->fd[1];
338 	close(c->fd[0]);
339 	dup(fd, 0);
340 	dup(fd, 1);
341 	for(i=3; i<100; i++)	/* should do better */
342 		close(i);
343 	strcpy(c->err, "exec failed");
344 	procexecl(c->execpid, "/bin/rc", "rc", "-c", c->cmd, nil);
345 }
346 
347 static void
348 execthread(void *a)
349 {
350 	Client *c;
351 	int p;
352 	char tmp[32];
353 
354 	c = a;
355 	snprint(tmp, sizeof tmp, "exec%d", c->num);
356 	threadsetname(tmp);
357 	c->execpid = chancreate(sizeof(ulong), 0);
358 	proccreate(execproc, c, STACK);
359 	p = recvul(c->execpid);
360 	chanfree(c->execpid);
361 	c->execpid = nil;
362 	close(c->fd[1]);
363 	c->fd[1] = c->fd[0];
364 	if(p != -1){
365 		c->pid = p;
366 		c->activethread = 2;
367 		threadcreate(readthread, c, STACK);
368 		threadcreate(writethread, c, STACK);
369 		if(c->execreq)
370 			respond(c->execreq, nil);
371 	}else{
372 		if(c->execreq)
373 			respond(c->execreq, c->err);
374 	}
375 }
376 
377 void
378 ctlwrite(Req *r, Client *c)
379 {
380 	char *f[3], *s, *p;
381 	int nf;
382 
383 	s = emalloc(r->ifcall.count+1);
384 	memmove(s, r->ifcall.data, r->ifcall.count);
385 	s[r->ifcall.count] = '\0';
386 
387 	f[0] = s;
388 	p = strchr(s, ' ');
389 	if(p == nil)
390 		nf = 1;
391 	else{
392 		*p++ = '\0';
393 		f[1] = p;
394 		nf = 2;
395 	}
396 
397 	if(f[0][0] == '\0'){
398 		free(s);
399 		respond(r, nil);
400 		return;
401 	}
402 
403 	r->ofcall.count = r->ifcall.count;
404 	if(strcmp(f[0], "hangup") == 0){
405 		if(c->pid == 0){
406 			respond(r, "connection already hung up");
407 			goto Out;
408 		}
409 		postnote(PNPROC, c->pid, "kill");
410 		respond(r, nil);
411 		goto Out;
412 	}
413 
414 	if(strcmp(f[0], "connect") == 0){
415 		if(c->cmd != nocmd){
416 			respond(r, "already have connection");
417 			goto Out;
418 		}
419 		if(nf == 1){
420 			respond(r, "need argument to connect");
421 			goto Out;
422 		}
423 		c->status = Exec;
424 		if(p = strrchr(f[1], '!'))
425 			*p = '\0';
426 		c->cmd = emalloc(4+1+strlen(f[1])+1);
427 		strcpy(c->cmd, "exec ");
428 		strcat(c->cmd, f[1]);
429 		c->execreq = r;
430 		threadcreate(execthread, c, STACK);
431 		goto Out;
432 	}
433 
434 	respond(r, "bad or inappropriate control message");
435 Out:
436 	free(s);
437 }
438