xref: /plan9/sys/src/cmd/unix/drawterm/kern/devpipe.c (revision 8ccd4a6360d974db7bd7bbd4f37e7018419ea908)
1 #include	"u.h"
2 #include	"lib.h"
3 #include	"dat.h"
4 #include	"fns.h"
5 #include	"error.h"
6 
7 #include	"netif.h"
8 
9 typedef struct Pipe	Pipe;
10 struct Pipe
11 {
12 	QLock lk;
13 	Pipe	*next;
14 	int	ref;
15 	ulong	path;
16 	Queue	*q[2];
17 	int	qref[2];
18 };
19 
20 struct
21 {
22 	Lock lk;
23 	ulong	path;
24 } pipealloc;
25 
26 enum
27 {
28 	Qdir,
29 	Qdata0,
30 	Qdata1,
31 };
32 
33 Dirtab pipedir[] =
34 {
35 	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
36 	"data",		{Qdata0},	0,		0600,
37 	"data1",	{Qdata1},	0,		0600,
38 };
39 #define NPIPEDIR 3
40 
41 static void
pipeinit(void)42 pipeinit(void)
43 {
44 	if(conf.pipeqsize == 0){
45 		if(conf.nmach > 1)
46 			conf.pipeqsize = 256*1024;
47 		else
48 			conf.pipeqsize = 32*1024;
49 	}
50 }
51 
52 /*
53  *  create a pipe, no streams are created until an open
54  */
55 static Chan*
pipeattach(char * spec)56 pipeattach(char *spec)
57 {
58 	Pipe *p;
59 	Chan *c;
60 
61 	c = devattach('|', spec);
62 	p = malloc(sizeof(Pipe));
63 	if(p == 0)
64 		exhausted("memory");
65 	p->ref = 1;
66 
67 	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
68 	if(p->q[0] == 0){
69 		free(p);
70 		exhausted("memory");
71 	}
72 	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
73 	if(p->q[1] == 0){
74 		free(p->q[0]);
75 		free(p);
76 		exhausted("memory");
77 	}
78 
79 	lock(&pipealloc.lk);
80 	p->path = ++pipealloc.path;
81 	unlock(&pipealloc.lk);
82 
83 	mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
84 	c->aux = p;
85 	c->dev = 0;
86 	return c;
87 }
88 
89 static int
pipegen(Chan * c,char * name,Dirtab * tab,int ntab,int i,Dir * dp)90 pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
91 {
92 	Qid q;
93 	int len;
94 	Pipe *p;
95 
96 	USED(name);
97 
98 	if(i == DEVDOTDOT){
99 		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
100 		return 1;
101 	}
102 	i++;	/* skip . */
103 	if(tab==0 || i>=ntab)
104 		return -1;
105 
106 	tab += i;
107 	p = c->aux;
108 	switch((ulong)tab->qid.path){
109 	case Qdata0:
110 		len = qlen(p->q[0]);
111 		break;
112 	case Qdata1:
113 		len = qlen(p->q[1]);
114 		break;
115 	default:
116 		len = tab->length;
117 		break;
118 	}
119 	mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
120 	devdir(c, q, tab->name, len, eve, tab->perm, dp);
121 	return 1;
122 }
123 
124 
125 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)126 pipewalk(Chan *c, Chan *nc, char **name, int nname)
127 {
128 	Walkqid *wq;
129 	Pipe *p;
130 
131 	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
132 	if(wq != nil && wq->clone != nil && wq->clone != c){
133 		p = c->aux;
134 		qlock(&p->lk);
135 		p->ref++;
136 		if(c->flag & COPEN){
137 			print("channel open in pipewalk\n");
138 			switch(NETTYPE(c->qid.path)){
139 			case Qdata0:
140 				p->qref[0]++;
141 				break;
142 			case Qdata1:
143 				p->qref[1]++;
144 				break;
145 			}
146 		}
147 		qunlock(&p->lk);
148 	}
149 	return wq;
150 }
151 
152 static int
pipestat(Chan * c,uchar * db,int n)153 pipestat(Chan *c, uchar *db, int n)
154 {
155 	Pipe *p;
156 	Dir dir;
157 
158 	p = c->aux;
159 
160 	switch(NETTYPE(c->qid.path)){
161 	case Qdir:
162 		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
163 		break;
164 	case Qdata0:
165 		devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
166 		break;
167 	case Qdata1:
168 		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
169 		break;
170 	default:
171 		panic("pipestat");
172 	}
173 	n = convD2M(&dir, db, n);
174 	if(n < BIT16SZ)
175 		error(Eshortstat);
176 	return n;
177 }
178 
179 /*
180  *  if the stream doesn't exist, create it
181  */
182 static Chan*
pipeopen(Chan * c,int omode)183 pipeopen(Chan *c, int omode)
184 {
185 	Pipe *p;
186 
187 	if(c->qid.type & QTDIR){
188 		if(omode != OREAD)
189 			error(Ebadarg);
190 		c->mode = omode;
191 		c->flag |= COPEN;
192 		c->offset = 0;
193 		return c;
194 	}
195 
196 	p = c->aux;
197 	qlock(&p->lk);
198 	switch(NETTYPE(c->qid.path)){
199 	case Qdata0:
200 		p->qref[0]++;
201 		break;
202 	case Qdata1:
203 		p->qref[1]++;
204 		break;
205 	}
206 	qunlock(&p->lk);
207 
208 	c->mode = openmode(omode);
209 	c->flag |= COPEN;
210 	c->offset = 0;
211 	c->iounit = qiomaxatomic;
212 	return c;
213 }
214 
215 static void
pipeclose(Chan * c)216 pipeclose(Chan *c)
217 {
218 	Pipe *p;
219 
220 	p = c->aux;
221 	qlock(&p->lk);
222 
223 	if(c->flag & COPEN){
224 		/*
225 		 *  closing either side hangs up the stream
226 		 */
227 		switch(NETTYPE(c->qid.path)){
228 		case Qdata0:
229 			p->qref[0]--;
230 			if(p->qref[0] == 0){
231 				qhangup(p->q[1], 0);
232 				qclose(p->q[0]);
233 			}
234 			break;
235 		case Qdata1:
236 			p->qref[1]--;
237 			if(p->qref[1] == 0){
238 				qhangup(p->q[0], 0);
239 				qclose(p->q[1]);
240 			}
241 			break;
242 		}
243 	}
244 
245 
246 	/*
247 	 *  if both sides are closed, they are reusable
248 	 */
249 	if(p->qref[0] == 0 && p->qref[1] == 0){
250 		qreopen(p->q[0]);
251 		qreopen(p->q[1]);
252 	}
253 
254 	/*
255 	 *  free the structure on last close
256 	 */
257 	p->ref--;
258 	if(p->ref == 0){
259 		qunlock(&p->lk);
260 		free(p->q[0]);
261 		free(p->q[1]);
262 		free(p);
263 	} else
264 		qunlock(&p->lk);
265 }
266 
267 static long
piperead(Chan * c,void * va,long n,vlong offset)268 piperead(Chan *c, void *va, long n, vlong offset)
269 {
270 	Pipe *p;
271 
272 	USED(offset);
273 
274 	p = c->aux;
275 
276 	switch(NETTYPE(c->qid.path)){
277 	case Qdir:
278 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
279 	case Qdata0:
280 		return qread(p->q[0], va, n);
281 	case Qdata1:
282 		return qread(p->q[1], va, n);
283 	default:
284 		panic("piperead");
285 	}
286 	return -1;	/* not reached */
287 }
288 
289 static Block*
pipebread(Chan * c,long n,ulong offset)290 pipebread(Chan *c, long n, ulong offset)
291 {
292 	Pipe *p;
293 
294 	p = c->aux;
295 
296 	switch(NETTYPE(c->qid.path)){
297 	case Qdata0:
298 		return qbread(p->q[0], n);
299 	case Qdata1:
300 		return qbread(p->q[1], n);
301 	}
302 
303 	return devbread(c, n, offset);
304 }
305 
306 /*
307  *  a write to a closed pipe causes a note to be sent to
308  *  the process.
309  */
310 static long
pipewrite(Chan * c,void * va,long n,vlong offset)311 pipewrite(Chan *c, void *va, long n, vlong offset)
312 {
313 	Pipe *p;
314 
315 	USED(offset);
316 	if(!islo())
317 		print("pipewrite hi %lux\n", getcallerpc(&c));
318 
319 	if(waserror()) {
320 		/* avoid notes when pipe is a mounted queue */
321 		if((c->flag & CMSG) == 0)
322 			postnote(up, 1, "sys: write on closed pipe", NUser);
323 		nexterror();
324 	}
325 
326 	p = c->aux;
327 
328 	switch(NETTYPE(c->qid.path)){
329 	case Qdata0:
330 		n = qwrite(p->q[1], va, n);
331 		break;
332 
333 	case Qdata1:
334 		n = qwrite(p->q[0], va, n);
335 		break;
336 
337 	default:
338 		panic("pipewrite");
339 	}
340 
341 	poperror();
342 	return n;
343 }
344 
345 static long
pipebwrite(Chan * c,Block * bp,ulong offset)346 pipebwrite(Chan *c, Block *bp, ulong offset)
347 {
348 	long n;
349 	Pipe *p;
350 
351 	USED(offset);
352 
353 	if(waserror()) {
354 		/* avoid notes when pipe is a mounted queue */
355 		if((c->flag & CMSG) == 0)
356 			postnote(up, 1, "sys: write on closed pipe", NUser);
357 		nexterror();
358 	}
359 
360 	p = c->aux;
361 	switch(NETTYPE(c->qid.path)){
362 	case Qdata0:
363 		n = qbwrite(p->q[1], bp);
364 		break;
365 
366 	case Qdata1:
367 		n = qbwrite(p->q[0], bp);
368 		break;
369 
370 	default:
371 		n = 0;
372 		panic("pipebwrite");
373 	}
374 
375 	poperror();
376 	return n;
377 }
378 
379 Dev pipedevtab = {
380 	'|',
381 	"pipe",
382 
383 	devreset,
384 	pipeinit,
385 	devshutdown,
386 	pipeattach,
387 	pipewalk,
388 	pipestat,
389 	pipeopen,
390 	devcreate,
391 	pipeclose,
392 	piperead,
393 	pipebread,
394 	pipewrite,
395 	pipebwrite,
396 	devremove,
397 	devwstat,
398 };
399