1 #include "u.h"
2 #include "../port/lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "../port/error.h"
7
8 typedef struct Pipe Pipe;
9 struct Pipe
10 {
11 QLock;
12 Pipe *next;
13 int ref;
14 ulong path;
15 long perm;
16 Queue *q[2];
17 int qref[2];
18 };
19
20 struct
21 {
22 Lock;
23 ulong path;
24 } pipealloc;
25
26 enum
27 {
28 Qdir,
29 Qdata0,
30 Qdata1,
31
32 PIPEQSIZE = 256*KiB,
33 };
34
35 Dirtab pipedir[] =
36 {
37 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
38 "data", {Qdata0}, 0, 0600,
39 "data1", {Qdata1}, 0, 0600,
40 };
41 #define NPIPEDIR 3
42
43 #define PIPETYPE(x) (((unsigned)x)&0x1f)
44 #define PIPEID(x) ((((unsigned)x))>>5)
45 #define PIPEQID(i, t) ((((unsigned)i)<<5)|(t))
46
47 /*
48 * create a pipe, no streams are created until an open
49 */
50 static Chan*
pipeattach(char * spec)51 pipeattach(char *spec)
52 {
53 Pipe *p;
54 Chan *c;
55
56 c = devattach('|', spec);
57 p = malloc(sizeof(Pipe));
58 if(p == 0)
59 exhausted("memory");
60 p->ref = 1;
61
62 p->q[0] = qopen(PIPEQSIZE, 0, 0, 0);
63 if(p->q[0] == 0){
64 free(p);
65 exhausted("memory");
66 }
67 p->q[1] = qopen(PIPEQSIZE, 0, 0, 0);
68 if(p->q[1] == 0){
69 free(p->q[0]);
70 free(p);
71 exhausted("memory");
72 }
73
74 lock(&pipealloc);
75 p->path = ++pipealloc.path;
76 unlock(&pipealloc);
77 p->perm = pipedir[Qdata0].perm;
78
79 mkqid(&c->qid, PIPEQID(2*p->path, Qdir), 0, QTDIR);
80 c->aux = p;
81 c->devno = 0;
82 return c;
83 }
84
85 static int
pipegen(Chan * c,char *,Dirtab * tab,int ntab,int i,Dir * dp)86 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
87 {
88 Qid q;
89 int len;
90 Pipe *p;
91
92 if(i == DEVDOTDOT){
93 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
94 return 1;
95 }
96 i++; /* skip . */
97 if(tab==0 || i>=ntab)
98 return -1;
99
100 tab += i;
101 p = c->aux;
102 switch((ulong)tab->qid.path){
103 case Qdata0:
104 len = qlen(p->q[0]);
105 break;
106 case Qdata1:
107 len = qlen(p->q[1]);
108 break;
109 default:
110 len = tab->length;
111 break;
112 }
113 mkqid(&q, PIPEQID(PIPEID(c->qid.path), tab->qid.path), 0, QTFILE);
114 devdir(c, q, tab->name, len, eve, p->perm, dp);
115 return 1;
116 }
117
118
119 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)120 pipewalk(Chan *c, Chan *nc, char **name, int nname)
121 {
122 Walkqid *wq;
123 Pipe *p;
124
125 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
126 if(wq != nil && wq->clone != nil && wq->clone != c){
127 p = c->aux;
128 qlock(p);
129 p->ref++;
130 if(c->flag & COPEN){
131 print("channel open in pipewalk\n");
132 switch(PIPETYPE(c->qid.path)){
133 case Qdata0:
134 p->qref[0]++;
135 break;
136 case Qdata1:
137 p->qref[1]++;
138 break;
139 }
140 }
141 qunlock(p);
142 }
143 return wq;
144 }
145
146 static long
pipestat(Chan * c,uchar * db,long n)147 pipestat(Chan *c, uchar *db, long n)
148 {
149 Pipe *p;
150 Dir dir;
151
152 p = c->aux;
153
154 switch(PIPETYPE(c->qid.path)){
155 case Qdir:
156 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
157 break;
158 case Qdata0:
159 devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
160 break;
161 case Qdata1:
162 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
163 break;
164 default:
165 panic("pipestat");
166 }
167 n = convD2M(&dir, db, n);
168 if(n < BIT16SZ)
169 error(Eshortstat);
170 return n;
171 }
172
173 static long
pipewstat(Chan * c,uchar * db,long n)174 pipewstat(Chan* c, uchar* db, long n)
175 {
176 int m;
177 Dir *dir;
178 Pipe *p;
179
180 p = c->aux;
181 if(strcmp(up->user, eve) != 0)
182 error(Eperm);
183 if(PIPETYPE(c->qid.path) == Qdir)
184 error(Eisdir);
185
186 dir = smalloc(sizeof(Dir)+n);
187 if(waserror()){
188 free(dir);
189 nexterror();
190 }
191 m = convM2D(db, n, &dir[0], (char*)&dir[1]);
192 if(m == 0)
193 error(Eshortstat);
194 if(!emptystr(dir[0].uid))
195 error("can't change owner");
196 if(dir[0].mode != ~0UL)
197 p->perm = dir[0].mode;
198 poperror();
199 free(dir);
200 return m;
201 }
202
203 /*
204 * if the stream doesn't exist, create it
205 */
206 static Chan*
pipeopen(Chan * c,int omode)207 pipeopen(Chan *c, int omode)
208 {
209 Pipe *p;
210
211 if(c->qid.type & QTDIR){
212 if(omode != OREAD)
213 error(Ebadarg);
214 c->mode = omode;
215 c->flag |= COPEN;
216 c->offset = 0;
217 return c;
218 }
219
220 p = c->aux;
221 qlock(p);
222 switch(PIPETYPE(c->qid.path)){
223 case Qdata0:
224 p->qref[0]++;
225 break;
226 case Qdata1:
227 p->qref[1]++;
228 break;
229 }
230 qunlock(p);
231
232 c->mode = openmode(omode);
233 c->flag |= COPEN;
234 c->offset = 0;
235 c->iounit = qiomaxatomic;
236 return c;
237 }
238
239 static void
pipeclose(Chan * c)240 pipeclose(Chan *c)
241 {
242 Pipe *p;
243
244 p = c->aux;
245 qlock(p);
246
247 if(c->flag & COPEN){
248 /*
249 * closing either side hangs up the stream
250 */
251 switch(PIPETYPE(c->qid.path)){
252 case Qdata0:
253 p->qref[0]--;
254 if(p->qref[0] == 0){
255 qhangup(p->q[1], 0);
256 qclose(p->q[0]);
257 }
258 break;
259 case Qdata1:
260 p->qref[1]--;
261 if(p->qref[1] == 0){
262 qhangup(p->q[0], 0);
263 qclose(p->q[1]);
264 }
265 break;
266 }
267 }
268
269
270 /*
271 * if both sides are closed, they are reusable
272 */
273 if(p->qref[0] == 0 && p->qref[1] == 0){
274 qreopen(p->q[0]);
275 qreopen(p->q[1]);
276 }
277
278 /*
279 * free the structure on last close
280 */
281 p->ref--;
282 if(p->ref == 0){
283 qunlock(p);
284 free(p->q[0]);
285 free(p->q[1]);
286 free(p);
287 } else
288 qunlock(p);
289 }
290
291 static long
piperead(Chan * c,void * va,long n,vlong)292 piperead(Chan *c, void *va, long n, vlong)
293 {
294 Pipe *p;
295
296 p = c->aux;
297
298 switch(PIPETYPE(c->qid.path)){
299 case Qdir:
300 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
301 case Qdata0:
302 return qread(p->q[0], va, n);
303 case Qdata1:
304 return qread(p->q[1], va, n);
305 default:
306 panic("piperead");
307 }
308 return -1; /* not reached */
309 }
310
311 static Block*
pipebread(Chan * c,long n,vlong offset)312 pipebread(Chan *c, long n, vlong offset)
313 {
314 Pipe *p;
315
316 p = c->aux;
317
318 switch(PIPETYPE(c->qid.path)){
319 case Qdata0:
320 return qbread(p->q[0], n);
321 case Qdata1:
322 return qbread(p->q[1], n);
323 }
324
325 return devbread(c, n, offset);
326 }
327
328 /*
329 * a write to a closed pipe causes a note to be sent to
330 * the process.
331 */
332 static long
pipewrite(Chan * c,void * va,long n,vlong)333 pipewrite(Chan *c, void *va, long n, vlong)
334 {
335 Pipe *p;
336
337 if(!islo())
338 print("pipewrite hi %#p\n", getcallerpc(&c));
339 if(waserror()) {
340 /* avoid notes when pipe is a mounted queue */
341 if((c->flag & CMSG) == 0)
342 postnote(up, 1, "sys: write on closed pipe", NUser);
343 nexterror();
344 }
345
346 p = c->aux;
347
348 switch(PIPETYPE(c->qid.path)){
349 case Qdata0:
350 n = qwrite(p->q[1], va, n);
351 break;
352
353 case Qdata1:
354 n = qwrite(p->q[0], va, n);
355 break;
356
357 default:
358 panic("pipewrite");
359 }
360
361 poperror();
362 return n;
363 }
364
365 static long
pipebwrite(Chan * c,Block * bp,vlong)366 pipebwrite(Chan *c, Block *bp, vlong)
367 {
368 long n;
369 Pipe *p;
370
371 if(waserror()) {
372 /* avoid notes when pipe is a mounted queue */
373 if((c->flag & CMSG) == 0)
374 postnote(up, 1, "sys: write on closed pipe", NUser);
375 nexterror();
376 }
377
378 p = c->aux;
379 switch(PIPETYPE(c->qid.path)){
380 case Qdata0:
381 n = qbwrite(p->q[1], bp);
382 break;
383
384 case Qdata1:
385 n = qbwrite(p->q[0], bp);
386 break;
387
388 default:
389 n = 0;
390 panic("pipebwrite");
391 }
392
393 poperror();
394 return n;
395 }
396
397 Dev pipedevtab = {
398 '|',
399 "pipe",
400
401 devreset,
402 devinit,
403 devshutdown,
404 pipeattach,
405 pipewalk,
406 pipestat,
407 pipeopen,
408 devcreate,
409 pipeclose,
410 piperead,
411 pipebread,
412 pipewrite,
413 pipebwrite,
414 devremove,
415 pipewstat,
416 };
417