xref: /plan9-contrib/sys/src/9k/port/devpipe.c (revision 406c76facc4b13aa2a55454bf4091aab9f03da22)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 typedef struct Pipe	Pipe;
9 struct Pipe
10 {
11 	QLock;
12 	Pipe	*next;
13 	int	ref;
14 	ulong	path;
15 	long	perm;
16 	Queue	*q[2];
17 	int	qref[2];
18 };
19 
20 struct
21 {
22 	Lock;
23 	ulong	path;
24 } pipealloc;
25 
26 enum
27 {
28 	Qdir,
29 	Qdata0,
30 	Qdata1,
31 
32 	PIPEQSIZE = 256*KiB,
33 };
34 
35 Dirtab pipedir[] =
36 {
37 	".",		{Qdir,0,QTDIR},	0,		DMDIR|0500,
38 	"data",		{Qdata0},	0,		0600,
39 	"data1",	{Qdata1},	0,		0600,
40 };
41 #define NPIPEDIR 3
42 
43 #define PIPETYPE(x)	(((unsigned)x)&0x1f)
44 #define PIPEID(x)	((((unsigned)x))>>5)
45 #define PIPEQID(i, t)	((((unsigned)i)<<5)|(t))
46 
47 /*
48  *  create a pipe, no streams are created until an open
49  */
50 static Chan*
pipeattach(char * spec)51 pipeattach(char *spec)
52 {
53 	Pipe *p;
54 	Chan *c;
55 
56 	c = devattach('|', spec);
57 	p = malloc(sizeof(Pipe));
58 	if(p == 0)
59 		exhausted("memory");
60 	p->ref = 1;
61 
62 	p->q[0] = qopen(PIPEQSIZE, 0, 0, 0);
63 	if(p->q[0] == 0){
64 		free(p);
65 		exhausted("memory");
66 	}
67 	p->q[1] = qopen(PIPEQSIZE, 0, 0, 0);
68 	if(p->q[1] == 0){
69 		free(p->q[0]);
70 		free(p);
71 		exhausted("memory");
72 	}
73 
74 	lock(&pipealloc);
75 	p->path = ++pipealloc.path;
76 	unlock(&pipealloc);
77 	p->perm = pipedir[Qdata0].perm;
78 
79 	mkqid(&c->qid, PIPEQID(2*p->path, Qdir), 0, QTDIR);
80 	c->aux = p;
81 	c->devno = 0;
82 	return c;
83 }
84 
85 static int
pipegen(Chan * c,char *,Dirtab * tab,int ntab,int i,Dir * dp)86 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
87 {
88 	Qid q;
89 	int len;
90 	Pipe *p;
91 
92 	if(i == DEVDOTDOT){
93 		devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
94 		return 1;
95 	}
96 	i++;	/* skip . */
97 	if(tab==0 || i>=ntab)
98 		return -1;
99 
100 	tab += i;
101 	p = c->aux;
102 	switch((ulong)tab->qid.path){
103 	case Qdata0:
104 		len = qlen(p->q[0]);
105 		break;
106 	case Qdata1:
107 		len = qlen(p->q[1]);
108 		break;
109 	default:
110 		len = tab->length;
111 		break;
112 	}
113 	mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
114 	devdir(c, q, tab->name, len, eve, p->perm, dp);
115 	return 1;
116 }
117 
118 
119 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)120 pipewalk(Chan *c, Chan *nc, char **name, int nname)
121 {
122 	Walkqid *wq;
123 	Pipe *p;
124 
125 	wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
126 	if(wq != nil && wq->clone != nil && wq->clone != c){
127 		p = c->aux;
128 		qlock(p);
129 		p->ref++;
130 		if(c->flag & COPEN){
131 			print("channel open in pipewalk\n");
132 			switch(PIPETYPE(c->qid.path)){
133 			case Qdata0:
134 				p->qref[0]++;
135 				break;
136 			case Qdata1:
137 				p->qref[1]++;
138 				break;
139 			}
140 		}
141 		qunlock(p);
142 	}
143 	return wq;
144 }
145 
146 static long
pipestat(Chan * c,uchar * db,long n)147 pipestat(Chan *c, uchar *db, long n)
148 {
149 	Pipe *p;
150 	Dir dir;
151 
152 	p = c->aux;
153 
154 	switch(PIPETYPE(c->qid.path)){
155 	case Qdir:
156 		devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
157 		break;
158 	case Qdata0:
159 		devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
160 		break;
161 	case Qdata1:
162 		devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
163 		break;
164 	default:
165 		panic("pipestat");
166 	}
167 	n = convD2M(&dir, db, n);
168 	if(n < BIT16SZ)
169 		error(Eshortstat);
170 	return n;
171 }
172 
173 static long
pipewstat(Chan * c,uchar * db,long n)174 pipewstat(Chan* c, uchar* db, long n)
175 {
176 	int m;
177 	Dir *dir;
178 	Pipe *p;
179 
180 	p = c->aux;
181 	if(strcmp(up->user, eve) != 0)
182 		error(Eperm);
183 	if(PIPETYPE(c->qid.path) == Qdir)
184 		error(Eisdir);
185 
186 	dir = smalloc(sizeof(Dir)+n);
187 	if(waserror()){
188 		free(dir);
189 		nexterror();
190 	}
191 	m = convM2D(db, n, &dir[0], (char*)&dir[1]);
192 	if(m == 0)
193 		error(Eshortstat);
194 	if(!emptystr(dir[0].uid))
195 		error("can't change owner");
196 	if(dir[0].mode != ~0UL)
197 		p->perm = dir[0].mode;
198 	poperror();
199 	free(dir);
200 	return m;
201 }
202 
203 /*
204  *  if the stream doesn't exist, create it
205  */
206 static Chan*
pipeopen(Chan * c,int omode)207 pipeopen(Chan *c, int omode)
208 {
209 	Pipe *p;
210 
211 	if(c->qid.type & QTDIR){
212 		if(omode != OREAD)
213 			error(Ebadarg);
214 		c->mode = omode;
215 		c->flag |= COPEN;
216 		c->offset = 0;
217 		return c;
218 	}
219 
220 	p = c->aux;
221 	qlock(p);
222 	switch(PIPETYPE(c->qid.path)){
223 	case Qdata0:
224 		p->qref[0]++;
225 		break;
226 	case Qdata1:
227 		p->qref[1]++;
228 		break;
229 	}
230 	qunlock(p);
231 
232 	c->mode = openmode(omode);
233 	c->flag |= COPEN;
234 	c->offset = 0;
235 	c->iounit = qiomaxatomic;
236 	return c;
237 }
238 
239 static void
pipeclose(Chan * c)240 pipeclose(Chan *c)
241 {
242 	Pipe *p;
243 
244 	p = c->aux;
245 	qlock(p);
246 
247 	if(c->flag & COPEN){
248 		/*
249 		 *  closing either side hangs up the stream
250 		 */
251 		switch(PIPETYPE(c->qid.path)){
252 		case Qdata0:
253 			p->qref[0]--;
254 			if(p->qref[0] == 0){
255 				qhangup(p->q[1], 0);
256 				qclose(p->q[0]);
257 			}
258 			break;
259 		case Qdata1:
260 			p->qref[1]--;
261 			if(p->qref[1] == 0){
262 				qhangup(p->q[0], 0);
263 				qclose(p->q[1]);
264 			}
265 			break;
266 		}
267 	}
268 
269 
270 	/*
271 	 *  if both sides are closed, they are reusable
272 	 */
273 	if(p->qref[0] == 0 && p->qref[1] == 0){
274 		qreopen(p->q[0]);
275 		qreopen(p->q[1]);
276 	}
277 
278 	/*
279 	 *  free the structure on last close
280 	 */
281 	p->ref--;
282 	if(p->ref == 0){
283 		qunlock(p);
284 		free(p->q[0]);
285 		free(p->q[1]);
286 		free(p);
287 	} else
288 		qunlock(p);
289 }
290 
291 static long
piperead(Chan * c,void * va,long n,vlong)292 piperead(Chan *c, void *va, long n, vlong)
293 {
294 	Pipe *p;
295 
296 	p = c->aux;
297 
298 	switch(PIPETYPE(c->qid.path)){
299 	case Qdir:
300 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
301 	case Qdata0:
302 		return qread(p->q[0], va, n);
303 	case Qdata1:
304 		return qread(p->q[1], va, n);
305 	default:
306 		panic("piperead");
307 	}
308 	return -1;	/* not reached */
309 }
310 
311 static Block*
pipebread(Chan * c,long n,vlong offset)312 pipebread(Chan *c, long n, vlong offset)
313 {
314 	Pipe *p;
315 
316 	p = c->aux;
317 
318 	switch(PIPETYPE(c->qid.path)){
319 	case Qdata0:
320 		return qbread(p->q[0], n);
321 	case Qdata1:
322 		return qbread(p->q[1], n);
323 	}
324 
325 	return devbread(c, n, offset);
326 }
327 
328 /*
329  *  a write to a closed pipe causes a note to be sent to
330  *  the process.
331  */
332 static long
pipewrite(Chan * c,void * va,long n,vlong)333 pipewrite(Chan *c, void *va, long n, vlong)
334 {
335 	Pipe *p;
336 
337 	if(!islo())
338 		print("pipewrite hi %#p\n", getcallerpc(&c));
339 	if(waserror()) {
340 		/* avoid notes when pipe is a mounted queue */
341 		if((c->flag & CMSG) == 0)
342 			postnote(up, 1, "sys: write on closed pipe", NUser);
343 		nexterror();
344 	}
345 
346 	p = c->aux;
347 
348 	switch(PIPETYPE(c->qid.path)){
349 	case Qdata0:
350 		n = qwrite(p->q[1], va, n);
351 		break;
352 
353 	case Qdata1:
354 		n = qwrite(p->q[0], va, n);
355 		break;
356 
357 	default:
358 		panic("pipewrite");
359 	}
360 
361 	poperror();
362 	return n;
363 }
364 
365 static long
pipebwrite(Chan * c,Block * bp,vlong)366 pipebwrite(Chan *c, Block *bp, vlong)
367 {
368 	long n;
369 	Pipe *p;
370 
371 	if(waserror()) {
372 		/* avoid notes when pipe is a mounted queue */
373 		if((c->flag & CMSG) == 0)
374 			postnote(up, 1, "sys: write on closed pipe", NUser);
375 		nexterror();
376 	}
377 
378 	p = c->aux;
379 	switch(PIPETYPE(c->qid.path)){
380 	case Qdata0:
381 		n = qbwrite(p->q[1], bp);
382 		break;
383 
384 	case Qdata1:
385 		n = qbwrite(p->q[0], bp);
386 		break;
387 
388 	default:
389 		n = 0;
390 		panic("pipebwrite");
391 	}
392 
393 	poperror();
394 	return n;
395 }
396 
397 Dev pipedevtab = {
398 	'|',
399 	"pipe",
400 
401 	devreset,
402 	devinit,
403 	devshutdown,
404 	pipeattach,
405 	pipewalk,
406 	pipestat,
407 	pipeopen,
408 	devcreate,
409 	pipeclose,
410 	piperead,
411 	pipebread,
412 	pipewrite,
413 	pipebwrite,
414 	devremove,
415 	pipewstat,
416 };
417