xref: /plan9/sys/src/9/port/devpipe.c (revision aa72973a2891ccbd3fb042462446761159389e19)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 #include	"netif.h"
9 
10 typedef struct Pipe	Pipe;
11 struct Pipe
12 {
13 	QLock;
14 	Pipe	*next;
15 	int	ref;
16 	ulong	path;
17 	long	perm;
18 	Queue	*q[2];
19 	int	qref[2];
20 };
21 
22 struct
23 {
24 	Lock;
25 	ulong	path;
26 } pipealloc;
27 
28 enum
29 {
30 	Qdir,
31 	Qdata0,
32 	Qdata1,
33 };
34 
35 Dirtab pipedir[] =
36 {
37 	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
38 	"data",		{Qdata0},	0,		0600,
39 	"data1",	{Qdata1},	0,		0600,
40 };
41 #define NPIPEDIR 3
42 
43 static void
pipeinit(void)44 pipeinit(void)
45 {
46 	if(conf.pipeqsize == 0){
47 		if(conf.nmach > 1)
48 			conf.pipeqsize = 256*1024;
49 		else
50 			conf.pipeqsize = 32*1024;
51 	}
52 }
53 
54 /*
55  *  create a pipe, no streams are created until an open
56  */
57 static Chan*
pipeattach(char * spec)58 pipeattach(char *spec)
59 {
60 	Pipe *p;
61 	Chan *c;
62 
63 	c = devattach('|', spec);
64 	p = malloc(sizeof(Pipe));
65 	if(p == 0)
66 		exhausted("memory");
67 	p->ref = 1;
68 
69 	p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
70 	if(p->q[0] == 0){
71 		free(p);
72 		exhausted("memory");
73 	}
74 	p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
75 	if(p->q[1] == 0){
76 		free(p->q[0]);
77 		free(p);
78 		exhausted("memory");
79 	}
80 
81 	lock(&pipealloc);
82 	p->path = ++pipealloc.path;
83 	unlock(&pipealloc);
84 	p->perm = pipedir[Qdata0].perm;
85 
86 	mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
87 	c->aux = p;
88 	c->dev = 0;
89 	return c;
90 }
91 
92 static int
pipegen(Chan * c,char *,Dirtab * tab,int ntab,int i,Dir * dp)93 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
94 {
95 	Qid q;
96 	int len;
97 	Pipe *p;
98 
99 	if(i == DEVDOTDOT){
100 		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
101 		return 1;
102 	}
103 	i++;	/* skip . */
104 	if(tab==0 || i>=ntab)
105 		return -1;
106 
107 	tab += i;
108 	p = c->aux;
109 	switch((ulong)tab->qid.path){
110 	case Qdata0:
111 		len = qlen(p->q[0]);
112 		break;
113 	case Qdata1:
114 		len = qlen(p->q[1]);
115 		break;
116 	default:
117 		len = tab->length;
118 		break;
119 	}
120 	mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
121 	devdir(c, q, tab->name, len, eve, p->perm, dp);
122 	return 1;
123 }
124 
125 
126 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)127 pipewalk(Chan *c, Chan *nc, char **name, int nname)
128 {
129 	Walkqid *wq;
130 	Pipe *p;
131 
132 	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
133 	if(wq != nil && wq->clone != nil && wq->clone != c){
134 		p = c->aux;
135 		qlock(p);
136 		p->ref++;
137 		if(c->flag & COPEN){
138 			print("channel open in pipewalk\n");
139 			switch(NETTYPE(c->qid.path)){
140 			case Qdata0:
141 				p->qref[0]++;
142 				break;
143 			case Qdata1:
144 				p->qref[1]++;
145 				break;
146 			}
147 		}
148 		qunlock(p);
149 	}
150 	return wq;
151 }
152 
153 static int
pipestat(Chan * c,uchar * db,int n)154 pipestat(Chan *c, uchar *db, int n)
155 {
156 	Pipe *p;
157 	Dir dir;
158 
159 	p = c->aux;
160 
161 	switch(NETTYPE(c->qid.path)){
162 	case Qdir:
163 		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
164 		break;
165 	case Qdata0:
166 		devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
167 		break;
168 	case Qdata1:
169 		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
170 		break;
171 	default:
172 		panic("pipestat");
173 	}
174 	n = convD2M(&dir, db, n);
175 	if(n < BIT16SZ)
176 		error(Eshortstat);
177 	return n;
178 }
179 
180 static int
pipewstat(Chan * c,uchar * db,int n)181 pipewstat(Chan* c, uchar* db, int n)
182 {
183 	int m;
184 	Dir *dir;
185 	Pipe *p;
186 
187 	p = c->aux;
188 	if(strcmp(up->user, eve) != 0)
189 		error(Eperm);
190 	if(NETTYPE(c->qid.path) == Qdir)
191 		error(Eisdir);
192 
193 	dir = smalloc(sizeof(Dir)+n);
194 	if(waserror()){
195 		free(dir);
196 		nexterror();
197 	}
198 	m = convM2D(db, n, &dir[0], (char*)&dir[1]);
199 	if(m == 0)
200 		error(Eshortstat);
201 	if(!emptystr(dir[0].uid))
202 		error("can't change owner");
203 	if(dir[0].mode != ~0UL)
204 		p->perm = dir[0].mode;
205 	poperror();
206 	free(dir);
207 	return m;
208 }
209 
210 /*
211  *  if the stream doesn't exist, create it
212  */
213 static Chan*
pipeopen(Chan * c,int omode)214 pipeopen(Chan *c, int omode)
215 {
216 	Pipe *p;
217 
218 	if(c->qid.type & QTDIR){
219 		if(omode != OREAD)
220 			error(Ebadarg);
221 		c->mode = omode;
222 		c->flag |= COPEN;
223 		c->offset = 0;
224 		return c;
225 	}
226 
227 	p = c->aux;
228 	qlock(p);
229 	switch(NETTYPE(c->qid.path)){
230 	case Qdata0:
231 		p->qref[0]++;
232 		break;
233 	case Qdata1:
234 		p->qref[1]++;
235 		break;
236 	}
237 	qunlock(p);
238 
239 	c->mode = openmode(omode);
240 	c->flag |= COPEN;
241 	c->offset = 0;
242 	c->iounit = qiomaxatomic;
243 	return c;
244 }
245 
246 static void
pipeclose(Chan * c)247 pipeclose(Chan *c)
248 {
249 	Pipe *p;
250 
251 	p = c->aux;
252 	qlock(p);
253 
254 	if(c->flag & COPEN){
255 		/*
256 		 *  closing either side hangs up the stream
257 		 */
258 		switch(NETTYPE(c->qid.path)){
259 		case Qdata0:
260 			p->qref[0]--;
261 			if(p->qref[0] == 0){
262 				qhangup(p->q[1], 0);
263 				qclose(p->q[0]);
264 			}
265 			break;
266 		case Qdata1:
267 			p->qref[1]--;
268 			if(p->qref[1] == 0){
269 				qhangup(p->q[0], 0);
270 				qclose(p->q[1]);
271 			}
272 			break;
273 		}
274 	}
275 
276 
277 	/*
278 	 *  if both sides are closed, they are reusable
279 	 */
280 	if(p->qref[0] == 0 && p->qref[1] == 0){
281 		qreopen(p->q[0]);
282 		qreopen(p->q[1]);
283 	}
284 
285 	/*
286 	 *  free the structure on last close
287 	 */
288 	p->ref--;
289 	if(p->ref == 0){
290 		qunlock(p);
291 		free(p->q[0]);
292 		free(p->q[1]);
293 		free(p);
294 	} else
295 		qunlock(p);
296 }
297 
298 static long
piperead(Chan * c,void * va,long n,vlong)299 piperead(Chan *c, void *va, long n, vlong)
300 {
301 	Pipe *p;
302 
303 	p = c->aux;
304 
305 	switch(NETTYPE(c->qid.path)){
306 	case Qdir:
307 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
308 	case Qdata0:
309 		return qread(p->q[0], va, n);
310 	case Qdata1:
311 		return qread(p->q[1], va, n);
312 	default:
313 		panic("piperead");
314 	}
315 	return -1;	/* not reached */
316 }
317 
318 static Block*
pipebread(Chan * c,long n,ulong offset)319 pipebread(Chan *c, long n, ulong offset)
320 {
321 	Pipe *p;
322 
323 	p = c->aux;
324 
325 	switch(NETTYPE(c->qid.path)){
326 	case Qdata0:
327 		return qbread(p->q[0], n);
328 	case Qdata1:
329 		return qbread(p->q[1], n);
330 	}
331 
332 	return devbread(c, n, offset);
333 }
334 
335 /*
336  *  a write to a closed pipe causes a note to be sent to
337  *  the process.
338  */
339 static long
pipewrite(Chan * c,void * va,long n,vlong)340 pipewrite(Chan *c, void *va, long n, vlong)
341 {
342 	Pipe *p;
343 
344 	if(!islo())
345 		print("pipewrite hi %#p\n", getcallerpc(&c));
346 	if(waserror()) {
347 		/* avoid notes when pipe is a mounted queue */
348 		if((c->flag & CMSG) == 0)
349 			postnote(up, 1, "sys: write on closed pipe", NUser);
350 		nexterror();
351 	}
352 
353 	p = c->aux;
354 
355 	switch(NETTYPE(c->qid.path)){
356 	case Qdata0:
357 		n = qwrite(p->q[1], va, n);
358 		break;
359 
360 	case Qdata1:
361 		n = qwrite(p->q[0], va, n);
362 		break;
363 
364 	default:
365 		panic("pipewrite");
366 	}
367 
368 	poperror();
369 	return n;
370 }
371 
372 static long
pipebwrite(Chan * c,Block * bp,ulong)373 pipebwrite(Chan *c, Block *bp, ulong)
374 {
375 	long n;
376 	Pipe *p;
377 
378 	if(waserror()) {
379 		/* avoid notes when pipe is a mounted queue */
380 		if((c->flag & CMSG) == 0)
381 			postnote(up, 1, "sys: write on closed pipe", NUser);
382 		nexterror();
383 	}
384 
385 	p = c->aux;
386 	switch(NETTYPE(c->qid.path)){
387 	case Qdata0:
388 		n = qbwrite(p->q[1], bp);
389 		break;
390 
391 	case Qdata1:
392 		n = qbwrite(p->q[0], bp);
393 		break;
394 
395 	default:
396 		n = 0;
397 		panic("pipebwrite");
398 	}
399 
400 	poperror();
401 	return n;
402 }
403 
404 Dev pipedevtab = {
405 	'|',
406 	"pipe",
407 
408 	devreset,
409 	pipeinit,
410 	devshutdown,
411 	pipeattach,
412 	pipewalk,
413 	pipestat,
414 	pipeopen,
415 	devcreate,
416 	pipeclose,
417 	piperead,
418 	pipebread,
419 	pipewrite,
420 	pipebwrite,
421 	devremove,
422 	pipewstat,
423 };
424