1 #include "u.h"
2 #include "lib.h"
3 #include "dat.h"
4 #include "fns.h"
5 #include "error.h"
6
7 #include "netif.h"
8
9 typedef struct Pipe Pipe;
10 struct Pipe
11 {
12 QLock lk;
13 Pipe *next;
14 int ref;
15 ulong path;
16 Queue *q[2];
17 int qref[2];
18 };
19
20 struct
21 {
22 Lock lk;
23 ulong path;
24 } pipealloc;
25
26 enum
27 {
28 Qdir,
29 Qdata0,
30 Qdata1,
31 };
32
33 Dirtab pipedir[] =
34 {
35 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500,
36 "data", {Qdata0}, 0, 0600,
37 "data1", {Qdata1}, 0, 0600,
38 };
39 #define NPIPEDIR 3
40
41 static void
pipeinit(void)42 pipeinit(void)
43 {
44 if(conf.pipeqsize == 0){
45 if(conf.nmach > 1)
46 conf.pipeqsize = 256*1024;
47 else
48 conf.pipeqsize = 32*1024;
49 }
50 }
51
52 /*
53 * create a pipe, no streams are created until an open
54 */
55 static Chan*
pipeattach(char * spec)56 pipeattach(char *spec)
57 {
58 Pipe *p;
59 Chan *c;
60
61 c = devattach('|', spec);
62 p = malloc(sizeof(Pipe));
63 if(p == 0)
64 exhausted("memory");
65 p->ref = 1;
66
67 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0);
68 if(p->q[0] == 0){
69 free(p);
70 exhausted("memory");
71 }
72 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0);
73 if(p->q[1] == 0){
74 free(p->q[0]);
75 free(p);
76 exhausted("memory");
77 }
78
79 lock(&pipealloc.lk);
80 p->path = ++pipealloc.path;
81 unlock(&pipealloc.lk);
82
83 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR);
84 c->aux = p;
85 c->dev = 0;
86 return c;
87 }
88
89 static int
pipegen(Chan * c,char * name,Dirtab * tab,int ntab,int i,Dir * dp)90 pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp)
91 {
92 Qid q;
93 int len;
94 Pipe *p;
95
96 USED(name);
97
98 if(i == DEVDOTDOT){
99 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp);
100 return 1;
101 }
102 i++; /* skip . */
103 if(tab==0 || i>=ntab)
104 return -1;
105
106 tab += i;
107 p = c->aux;
108 switch((ulong)tab->qid.path){
109 case Qdata0:
110 len = qlen(p->q[0]);
111 break;
112 case Qdata1:
113 len = qlen(p->q[1]);
114 break;
115 default:
116 len = tab->length;
117 break;
118 }
119 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE);
120 devdir(c, q, tab->name, len, eve, tab->perm, dp);
121 return 1;
122 }
123
124
125 static Walkqid*
pipewalk(Chan * c,Chan * nc,char ** name,int nname)126 pipewalk(Chan *c, Chan *nc, char **name, int nname)
127 {
128 Walkqid *wq;
129 Pipe *p;
130
131 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen);
132 if(wq != nil && wq->clone != nil && wq->clone != c){
133 p = c->aux;
134 qlock(&p->lk);
135 p->ref++;
136 if(c->flag & COPEN){
137 print("channel open in pipewalk\n");
138 switch(NETTYPE(c->qid.path)){
139 case Qdata0:
140 p->qref[0]++;
141 break;
142 case Qdata1:
143 p->qref[1]++;
144 break;
145 }
146 }
147 qunlock(&p->lk);
148 }
149 return wq;
150 }
151
152 static int
pipestat(Chan * c,uchar * db,int n)153 pipestat(Chan *c, uchar *db, int n)
154 {
155 Pipe *p;
156 Dir dir;
157
158 p = c->aux;
159
160 switch(NETTYPE(c->qid.path)){
161 case Qdir:
162 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir);
163 break;
164 case Qdata0:
165 devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir);
166 break;
167 case Qdata1:
168 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir);
169 break;
170 default:
171 panic("pipestat");
172 }
173 n = convD2M(&dir, db, n);
174 if(n < BIT16SZ)
175 error(Eshortstat);
176 return n;
177 }
178
179 /*
180 * if the stream doesn't exist, create it
181 */
182 static Chan*
pipeopen(Chan * c,int omode)183 pipeopen(Chan *c, int omode)
184 {
185 Pipe *p;
186
187 if(c->qid.type & QTDIR){
188 if(omode != OREAD)
189 error(Ebadarg);
190 c->mode = omode;
191 c->flag |= COPEN;
192 c->offset = 0;
193 return c;
194 }
195
196 p = c->aux;
197 qlock(&p->lk);
198 switch(NETTYPE(c->qid.path)){
199 case Qdata0:
200 p->qref[0]++;
201 break;
202 case Qdata1:
203 p->qref[1]++;
204 break;
205 }
206 qunlock(&p->lk);
207
208 c->mode = openmode(omode);
209 c->flag |= COPEN;
210 c->offset = 0;
211 c->iounit = qiomaxatomic;
212 return c;
213 }
214
215 static void
pipeclose(Chan * c)216 pipeclose(Chan *c)
217 {
218 Pipe *p;
219
220 p = c->aux;
221 qlock(&p->lk);
222
223 if(c->flag & COPEN){
224 /*
225 * closing either side hangs up the stream
226 */
227 switch(NETTYPE(c->qid.path)){
228 case Qdata0:
229 p->qref[0]--;
230 if(p->qref[0] == 0){
231 qhangup(p->q[1], 0);
232 qclose(p->q[0]);
233 }
234 break;
235 case Qdata1:
236 p->qref[1]--;
237 if(p->qref[1] == 0){
238 qhangup(p->q[0], 0);
239 qclose(p->q[1]);
240 }
241 break;
242 }
243 }
244
245
246 /*
247 * if both sides are closed, they are reusable
248 */
249 if(p->qref[0] == 0 && p->qref[1] == 0){
250 qreopen(p->q[0]);
251 qreopen(p->q[1]);
252 }
253
254 /*
255 * free the structure on last close
256 */
257 p->ref--;
258 if(p->ref == 0){
259 qunlock(&p->lk);
260 free(p->q[0]);
261 free(p->q[1]);
262 free(p);
263 } else
264 qunlock(&p->lk);
265 }
266
267 static long
piperead(Chan * c,void * va,long n,vlong offset)268 piperead(Chan *c, void *va, long n, vlong offset)
269 {
270 Pipe *p;
271
272 USED(offset);
273
274 p = c->aux;
275
276 switch(NETTYPE(c->qid.path)){
277 case Qdir:
278 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
279 case Qdata0:
280 return qread(p->q[0], va, n);
281 case Qdata1:
282 return qread(p->q[1], va, n);
283 default:
284 panic("piperead");
285 }
286 return -1; /* not reached */
287 }
288
289 static Block*
pipebread(Chan * c,long n,ulong offset)290 pipebread(Chan *c, long n, ulong offset)
291 {
292 Pipe *p;
293
294 p = c->aux;
295
296 switch(NETTYPE(c->qid.path)){
297 case Qdata0:
298 return qbread(p->q[0], n);
299 case Qdata1:
300 return qbread(p->q[1], n);
301 }
302
303 return devbread(c, n, offset);
304 }
305
306 /*
307 * a write to a closed pipe causes a note to be sent to
308 * the process.
309 */
310 static long
pipewrite(Chan * c,void * va,long n,vlong offset)311 pipewrite(Chan *c, void *va, long n, vlong offset)
312 {
313 Pipe *p;
314
315 USED(offset);
316 if(!islo())
317 print("pipewrite hi %lux\n", getcallerpc(&c));
318
319 if(waserror()) {
320 /* avoid notes when pipe is a mounted queue */
321 if((c->flag & CMSG) == 0)
322 postnote(up, 1, "sys: write on closed pipe", NUser);
323 nexterror();
324 }
325
326 p = c->aux;
327
328 switch(NETTYPE(c->qid.path)){
329 case Qdata0:
330 n = qwrite(p->q[1], va, n);
331 break;
332
333 case Qdata1:
334 n = qwrite(p->q[0], va, n);
335 break;
336
337 default:
338 panic("pipewrite");
339 }
340
341 poperror();
342 return n;
343 }
344
345 static long
pipebwrite(Chan * c,Block * bp,ulong offset)346 pipebwrite(Chan *c, Block *bp, ulong offset)
347 {
348 long n;
349 Pipe *p;
350
351 USED(offset);
352
353 if(waserror()) {
354 /* avoid notes when pipe is a mounted queue */
355 if((c->flag & CMSG) == 0)
356 postnote(up, 1, "sys: write on closed pipe", NUser);
357 nexterror();
358 }
359
360 p = c->aux;
361 switch(NETTYPE(c->qid.path)){
362 case Qdata0:
363 n = qbwrite(p->q[1], bp);
364 break;
365
366 case Qdata1:
367 n = qbwrite(p->q[0], bp);
368 break;
369
370 default:
371 n = 0;
372 panic("pipebwrite");
373 }
374
375 poperror();
376 return n;
377 }
378
379 Dev pipedevtab = {
380 '|',
381 "pipe",
382
383 devreset,
384 pipeinit,
385 devshutdown,
386 pipeattach,
387 pipewalk,
388 pipestat,
389 pipeopen,
390 devcreate,
391 pipeclose,
392 piperead,
393 pipebread,
394 pipewrite,
395 pipebwrite,
396 devremove,
397 devwstat,
398 };
399