xref: /plan9-contrib/sys/src/9/port/devpipe.c (revision ec59a3ddbfceee0efe34584c2c9981a5e5ff1ec4)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 #include	"netif.h"
9 
10 typedef struct Pipe	Pipe;
11 struct Pipe
12 {
13 	QLock;
14 	Pipe	*next;
15 	int	ref;
16 	ulong	path;
17 	Queue	*q[2];
18 	int	qref[2];
19 };
20 
21 struct
22 {
23 	Lock;
24 	ulong	path;
25 } pipealloc;
26 
27 enum
28 {
29 	Qdir,
30 	Qdata0,
31 	Qdata1,
32 };
33 
34 Dirtab pipedir[] =
35 {
36 	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
37 	"data",		{Qdata0},	0,		0600,
38 	"data1",	{Qdata1},	0,		0600,
39 };
40 #define NPIPEDIR 3
41 
42 static void
43 pipeinit(void)
44 {
45 	if(conf.pipeqsize == 0){
46 		if(conf.nmach > 1)
47 			conf.pipeqsize = 256*1024;
48 		else
49 			conf.pipeqsize = 32*1024;
50 	}
51 }
52 
53 /*
54  *  create a pipe, no streams are created until an open
55  */
56 static Chan*
57 pipeattach(char *spec)
58 {
59 	Pipe *p;
60 	Chan *c;
61 
62 	c = devattach('|', spec);
63 	p = malloc(sizeof(Pipe));
64 	if(p == 0)
65 		exhausted("memory");
66 	p->ref = 1;
67 
68 	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
69 	if(p->q[0] == 0){
70 		free(p);
71 		exhausted("memory");
72 	}
73 	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
74 	if(p->q[1] == 0){
75 		free(p->q[0]);
76 		free(p);
77 		exhausted("memory");
78 	}
79 
80 	lock(&pipealloc);
81 	p->path = ++pipealloc.path;
82 	unlock(&pipealloc);
83 
84 	mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
85 	c->aux = p;
86 	c->dev = 0;
87 	return c;
88 }
89 
90 static int
91 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
92 {
93 	Qid q;
94 	int len;
95 	Pipe *p;
96 
97 	if(i == DEVDOTDOT){
98 		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
99 		return 1;
100 	}
101 	i++;	/* skip . */
102 	if(tab==0 || i>=ntab)
103 		return -1;
104 
105 	tab += i;
106 	p = c->aux;
107 	switch((ulong)tab->qid.path){
108 	case Qdata0:
109 		len = qlen(p->q[0]);
110 		break;
111 	case Qdata1:
112 		len = qlen(p->q[1]);
113 		break;
114 	default:
115 		len = tab->length;
116 		break;
117 	}
118 	mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
119 	devdir(c, q, tab->name, len, eve, tab->perm, dp);
120 	return 1;
121 }
122 
123 
124 static Walkqid*
125 pipewalk(Chan *c, Chan *nc, char **name, int nname)
126 {
127 	Walkqid *wq;
128 	Pipe *p;
129 
130 	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
131 	if(wq != nil && wq->clone != nil && wq->clone != c){
132 		p = c->aux;
133 		qlock(p);
134 		p->ref++;
135 		if(c->flag & COPEN){
136 			print("channel open in pipewalk\n");
137 			switch(NETTYPE(c->qid.path)){
138 			case Qdata0:
139 				p->qref[0]++;
140 				break;
141 			case Qdata1:
142 				p->qref[1]++;
143 				break;
144 			}
145 		}
146 		qunlock(p);
147 	}
148 	return wq;
149 }
150 
151 static int
152 pipestat(Chan *c, uchar *db, int n)
153 {
154 	Pipe *p;
155 	Dir dir;
156 
157 	p = c->aux;
158 
159 	switch(NETTYPE(c->qid.path)){
160 	case Qdir:
161 		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
162 		break;
163 	case Qdata0:
164 		devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
165 		break;
166 	case Qdata1:
167 		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
168 		break;
169 	default:
170 		panic("pipestat");
171 	}
172 	n = convD2M(&dir, db, n);
173 	if(n < BIT16SZ)
174 		error(Eshortstat);
175 	return n;
176 }
177 
178 /*
179  *  if the stream doesn't exist, create it
180  */
181 static Chan*
182 pipeopen(Chan *c, int omode)
183 {
184 	Pipe *p;
185 
186 	if(c->qid.type & QTDIR){
187 		if(omode != OREAD)
188 			error(Ebadarg);
189 		c->mode = omode;
190 		c->flag |= COPEN;
191 		c->offset = 0;
192 		return c;
193 	}
194 
195 	p = c->aux;
196 	qlock(p);
197 	switch(NETTYPE(c->qid.path)){
198 	case Qdata0:
199 		p->qref[0]++;
200 		break;
201 	case Qdata1:
202 		p->qref[1]++;
203 		break;
204 	}
205 	qunlock(p);
206 
207 	c->mode = openmode(omode);
208 	c->flag |= COPEN;
209 	c->offset = 0;
210 	c->iounit = qiomaxatomic;
211 	return c;
212 }
213 
214 static void
215 pipeclose(Chan *c)
216 {
217 	Pipe *p;
218 
219 	p = c->aux;
220 	qlock(p);
221 
222 	if(c->flag & COPEN){
223 		/*
224 		 *  closing either side hangs up the stream
225 		 */
226 		switch(NETTYPE(c->qid.path)){
227 		case Qdata0:
228 			p->qref[0]--;
229 			if(p->qref[0] == 0){
230 				qhangup(p->q[1], 0);
231 				qclose(p->q[0]);
232 			}
233 			break;
234 		case Qdata1:
235 			p->qref[1]--;
236 			if(p->qref[1] == 0){
237 				qhangup(p->q[0], 0);
238 				qclose(p->q[1]);
239 			}
240 			break;
241 		}
242 	}
243 
244 
245 	/*
246 	 *  if both sides are closed, they are reusable
247 	 */
248 	if(p->qref[0] == 0 && p->qref[1] == 0){
249 		qreopen(p->q[0]);
250 		qreopen(p->q[1]);
251 	}
252 
253 	/*
254 	 *  free the structure on last close
255 	 */
256 	p->ref--;
257 	if(p->ref == 0){
258 		qunlock(p);
259 		free(p->q[0]);
260 		free(p->q[1]);
261 		free(p);
262 	} else
263 		qunlock(p);
264 }
265 
266 static long
267 piperead(Chan *c, void *va, long n, vlong)
268 {
269 	Pipe *p;
270 
271 	p = c->aux;
272 
273 	switch(NETTYPE(c->qid.path)){
274 	case Qdir:
275 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
276 	case Qdata0:
277 		return qread(p->q[0], va, n);
278 	case Qdata1:
279 		return qread(p->q[1], va, n);
280 	default:
281 		panic("piperead");
282 	}
283 	return -1;	/* not reached */
284 }
285 
286 static Block*
287 pipebread(Chan *c, long n, ulong offset)
288 {
289 	Pipe *p;
290 
291 	p = c->aux;
292 
293 	switch(NETTYPE(c->qid.path)){
294 	case Qdata0:
295 		return qbread(p->q[0], n);
296 	case Qdata1:
297 		return qbread(p->q[1], n);
298 	}
299 
300 	return devbread(c, n, offset);
301 }
302 
303 /*
304  *  a write to a closed pipe causes a note to be sent to
305  *  the process.
306  */
307 static long
308 pipewrite(Chan *c, void *va, long n, vlong)
309 {
310 	Pipe *p;
311 
312 	if(!islo())
313 		print("pipewrite hi %lux\n", getcallerpc(&c));
314 	if(waserror()) {
315 		/* avoid notes when pipe is a mounted queue */
316 		if((c->flag & CMSG) == 0)
317 			postnote(up, 1, "sys: write on closed pipe", NUser);
318 		nexterror();
319 	}
320 
321 	p = c->aux;
322 
323 	switch(NETTYPE(c->qid.path)){
324 	case Qdata0:
325 		n = qwrite(p->q[1], va, n);
326 		break;
327 
328 	case Qdata1:
329 		n = qwrite(p->q[0], va, n);
330 		break;
331 
332 	default:
333 		panic("pipewrite");
334 	}
335 
336 	poperror();
337 	return n;
338 }
339 
340 static long
341 pipebwrite(Chan *c, Block *bp, ulong)
342 {
343 	long n;
344 	Pipe *p;
345 
346 	if(waserror()) {
347 		/* avoid notes when pipe is a mounted queue */
348 		if((c->flag & CMSG) == 0)
349 			postnote(up, 1, "sys: write on closed pipe", NUser);
350 		nexterror();
351 	}
352 
353 	p = c->aux;
354 	switch(NETTYPE(c->qid.path)){
355 	case Qdata0:
356 		n = qbwrite(p->q[1], bp);
357 		break;
358 
359 	case Qdata1:
360 		n = qbwrite(p->q[0], bp);
361 		break;
362 
363 	default:
364 		n = 0;
365 		panic("pipebwrite");
366 	}
367 
368 	poperror();
369 	return n;
370 }
371 
372 Dev pipedevtab = {
373 	'|',
374 	"pipe",
375 
376 	devreset,
377 	pipeinit,
378 	devshutdown,
379 	pipeattach,
380 	pipewalk,
381 	pipestat,
382 	pipeopen,
383 	devcreate,
384 	pipeclose,
385 	piperead,
386 	pipebread,
387 	pipewrite,
388 	pipebwrite,
389 	devremove,
390 	devwstat,
391 };
392