1 #include "u.h"
2 #include "../port/lib.h"
3 #include "mem.h"
4 #include "dat.h"
5 #include "fns.h"
6 #include "../port/error.h"
7
8 #include "netif.h"
9
10 typedef struct Pipe Pipe;
11 struct Pipe
12 {
13 QLock;
14 Pipe *next;
15 int ref;
16 ulong path;
17 long perm;
18 Queue *q[2];
19 int qref[2];
20 };
21
22 struct
23 {
24 Lock;
25 ulong path;
26 } pipealloc;
27
28 enum
29 {
30 Qdir,
31 Qdata0,
32 Qdata1,
33 };
34
35 Dirtab pipedir[] =
36 {
37 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
38 "data", {Qdata0}, 0, 0600,
39 "data1", {Qdata1}, 0, 0600,
40 };
41 #define NPIPEDIR 3
42
43 static void
pipeinit(void)44 pipeinit(void)
45 {
46 if(conf.pipeqsize == 0){
47 if(conf.nmach > 1)
48 conf.pipeqsize = 256*1024;
49 else
50 conf.pipeqsize = 32*1024;
51 }
52 }
53
54 /*
55 * create a pipe, no streams are created until an open
56 */
57 static Chan*
pipeattach(char * spec)58 pipeattach(char *spec)
59 {
60 Pipe *p;
61 Chan *c;
62
63 c = devattach('|', spec);
64 p = malloc(sizeof(Pipe));
65 if(p == 0)
66 exhausted("memory");
67 p->ref = 1;
68
69 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
70 if(p->q[0] == 0){
71 free(p);
72 exhausted("memory");
73 }
74 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
75 if(p->q[1] == 0){
76 free(p->q[0]);
77 free(p);
78 exhausted("memory");
79 }
80
81 lock(&pipealloc);
82 p->path = ++pipealloc.path;
83 unlock(&pipealloc);
84 p->perm = pipedir[Qdata0].perm;
85
86 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
87 c->aux = p;
88 c->dev = 0;
89 return c;
90 }
91
92 static int
pipegen(Chan * c,char *,Dirtab * tab,int ntab,int i,Dir * dp)93 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp)
94 {
95 Qid q;
96 int len;
97 Pipe *p;
98
99 if(i == DEVDOTDOT){
100 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
101 return 1;
102 }
103 i++; /* skip . */
104 if(tab==0 || i>=ntab)
105 return -1;
106
107 tab += i;
108 p = c->aux;
109 switch((ulong)tab->qid.path){
110 case Qdata0:
111 len = qlen(p->q[0]);
112 break;
113 case Qdata1:
114 len = qlen(p->q[1]);
115 break;
116 default:
117 len = tab->length;
118 break;
119 }
120 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
121 devdir(c, q, tab->name, len, eve, p->perm, dp);
122 return 1;
123 }
124
125
126 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)127 pipewalk(Chan *c, Chan *nc, char **name, int nname)
128 {
129 Walkqid *wq;
130 Pipe *p;
131
132 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
133 if(wq != nil && wq->clone != nil && wq->clone != c){
134 p = c->aux;
135 qlock(p);
136 p->ref++;
137 if(c->flag & COPEN){
138 print("channel open in pipewalk\n");
139 switch(NETTYPE(c->qid.path)){
140 case Qdata0:
141 p->qref[0]++;
142 break;
143 case Qdata1:
144 p->qref[1]++;
145 break;
146 }
147 }
148 qunlock(p);
149 }
150 return wq;
151 }
152
153 static int
pipestat(Chan * c,uchar * db,int n)154 pipestat(Chan *c, uchar *db, int n)
155 {
156 Pipe *p;
157 Dir dir;
158
159 p = c->aux;
160
161 switch(NETTYPE(c->qid.path)){
162 case Qdir:
163 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
164 break;
165 case Qdata0:
166 devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir);
167 break;
168 case Qdata1:
169 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir);
170 break;
171 default:
172 panic("pipestat");
173 }
174 n = convD2M(&dir, db, n);
175 if(n < BIT16SZ)
176 error(Eshortstat);
177 return n;
178 }
179
180 static int
pipewstat(Chan * c,uchar * db,int n)181 pipewstat(Chan* c, uchar* db, int n)
182 {
183 int m;
184 Dir *dir;
185 Pipe *p;
186
187 p = c->aux;
188 if(strcmp(up->user, eve) != 0)
189 error(Eperm);
190 if(NETTYPE(c->qid.path) == Qdir)
191 error(Eisdir);
192
193 dir = smalloc(sizeof(Dir)+n);
194 if(waserror()){
195 free(dir);
196 nexterror();
197 }
198 m = convM2D(db, n, &dir[0], (char*)&dir[1]);
199 if(m == 0)
200 error(Eshortstat);
201 if(!emptystr(dir[0].uid))
202 error("can't change owner");
203 if(dir[0].mode != ~0UL)
204 p->perm = dir[0].mode;
205 poperror();
206 free(dir);
207 return m;
208 }
209
210 /*
211 * if the stream doesn't exist, create it
212 */
213 static Chan*
pipeopen(Chan * c,int omode)214 pipeopen(Chan *c, int omode)
215 {
216 Pipe *p;
217
218 if(c->qid.type & QTDIR){
219 if(omode != OREAD)
220 error(Ebadarg);
221 c->mode = omode;
222 c->flag |= COPEN;
223 c->offset = 0;
224 return c;
225 }
226
227 p = c->aux;
228 qlock(p);
229 switch(NETTYPE(c->qid.path)){
230 case Qdata0:
231 p->qref[0]++;
232 break;
233 case Qdata1:
234 p->qref[1]++;
235 break;
236 }
237 qunlock(p);
238
239 c->mode = openmode(omode);
240 c->flag |= COPEN;
241 c->offset = 0;
242 c->iounit = qiomaxatomic;
243 return c;
244 }
245
246 static void
pipeclose(Chan * c)247 pipeclose(Chan *c)
248 {
249 Pipe *p;
250
251 p = c->aux;
252 qlock(p);
253
254 if(c->flag & COPEN){
255 /*
256 * closing either side hangs up the stream
257 */
258 switch(NETTYPE(c->qid.path)){
259 case Qdata0:
260 p->qref[0]--;
261 if(p->qref[0] == 0){
262 qhangup(p->q[1], 0);
263 qclose(p->q[0]);
264 }
265 break;
266 case Qdata1:
267 p->qref[1]--;
268 if(p->qref[1] == 0){
269 qhangup(p->q[0], 0);
270 qclose(p->q[1]);
271 }
272 break;
273 }
274 }
275
276
277 /*
278 * if both sides are closed, they are reusable
279 */
280 if(p->qref[0] == 0 && p->qref[1] == 0){
281 qreopen(p->q[0]);
282 qreopen(p->q[1]);
283 }
284
285 /*
286 * free the structure on last close
287 */
288 p->ref--;
289 if(p->ref == 0){
290 qunlock(p);
291 free(p->q[0]);
292 free(p->q[1]);
293 free(p);
294 } else
295 qunlock(p);
296 }
297
298 static long
piperead(Chan * c,void * va,long n,vlong)299 piperead(Chan *c, void *va, long n, vlong)
300 {
301 Pipe *p;
302
303 p = c->aux;
304
305 switch(NETTYPE(c->qid.path)){
306 case Qdir:
307 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
308 case Qdata0:
309 return qread(p->q[0], va, n);
310 case Qdata1:
311 return qread(p->q[1], va, n);
312 default:
313 panic("piperead");
314 }
315 return -1; /* not reached */
316 }
317
318 static Block*
pipebread(Chan * c,long n,ulong offset)319 pipebread(Chan *c, long n, ulong offset)
320 {
321 Pipe *p;
322
323 p = c->aux;
324
325 switch(NETTYPE(c->qid.path)){
326 case Qdata0:
327 return qbread(p->q[0], n);
328 case Qdata1:
329 return qbread(p->q[1], n);
330 }
331
332 return devbread(c, n, offset);
333 }
334
335 /*
336 * a write to a closed pipe causes a note to be sent to
337 * the process.
338 */
339 static long
pipewrite(Chan * c,void * va,long n,vlong)340 pipewrite(Chan *c, void *va, long n, vlong)
341 {
342 Pipe *p;
343
344 if(!islo())
345 print("pipewrite hi %#p\n", getcallerpc(&c));
346 if(waserror()) {
347 /* avoid notes when pipe is a mounted queue */
348 if((c->flag & CMSG) == 0)
349 postnote(up, 1, "sys: write on closed pipe", NUser);
350 nexterror();
351 }
352
353 p = c->aux;
354
355 switch(NETTYPE(c->qid.path)){
356 case Qdata0:
357 n = qwrite(p->q[1], va, n);
358 break;
359
360 case Qdata1:
361 n = qwrite(p->q[0], va, n);
362 break;
363
364 default:
365 panic("pipewrite");
366 }
367
368 poperror();
369 return n;
370 }
371
372 static long
pipebwrite(Chan * c,Block * bp,ulong)373 pipebwrite(Chan *c, Block *bp, ulong)
374 {
375 long n;
376 Pipe *p;
377
378 if(waserror()) {
379 /* avoid notes when pipe is a mounted queue */
380 if((c->flag & CMSG) == 0)
381 postnote(up, 1, "sys: write on closed pipe", NUser);
382 nexterror();
383 }
384
385 p = c->aux;
386 switch(NETTYPE(c->qid.path)){
387 case Qdata0:
388 n = qbwrite(p->q[1], bp);
389 break;
390
391 case Qdata1:
392 n = qbwrite(p->q[0], bp);
393 break;
394
395 default:
396 n = 0;
397 panic("pipebwrite");
398 }
399
400 poperror();
401 return n;
402 }
403
404 Dev pipedevtab = {
405 '|',
406 "pipe",
407
408 devreset,
409 pipeinit,
410 devshutdown,
411 pipeattach,
412 pipewalk,
413 pipestat,
414 pipeopen,
415 devcreate,
416 pipeclose,
417 piperead,
418 pipebread,
419 pipewrite,
420 pipebwrite,
421 devremove,
422 pipewstat,
423 };
424