1 #include "u.h" 2 #include "lib.h" 3 #include "dat.h" 4 #include "fns.h" 5 #include "error.h" 6 7 #include "netif.h" 8 9 typedef struct Pipe Pipe; 10 struct Pipe 11 { 12 QLock lk; 13 Pipe *next; 14 int ref; 15 ulong path; 16 Queue *q[2]; 17 int qref[2]; 18 }; 19 20 struct 21 { 22 Lock lk; 23 ulong path; 24 } pipealloc; 25 26 enum 27 { 28 Qdir, 29 Qdata0, 30 Qdata1, 31 }; 32 33 Dirtab pipedir[] = 34 { 35 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500, 36 "data", {Qdata0}, 0, 0600, 37 "data1", {Qdata1}, 0, 0600, 38 }; 39 #define NPIPEDIR 3 40 41 static void 42 pipeinit(void) 43 { 44 if(conf.pipeqsize == 0){ 45 if(conf.nmach > 1) 46 conf.pipeqsize = 256*1024; 47 else 48 conf.pipeqsize = 32*1024; 49 } 50 } 51 52 /* 53 * create a pipe, no streams are created until an open 54 */ 55 static Chan* 56 pipeattach(char *spec) 57 { 58 Pipe *p; 59 Chan *c; 60 61 c = devattach('|', spec); 62 p = malloc(sizeof(Pipe)); 63 if(p == 0) 64 exhausted("memory"); 65 p->ref = 1; 66 67 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0); 68 if(p->q[0] == 0){ 69 free(p); 70 exhausted("memory"); 71 } 72 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0); 73 if(p->q[1] == 0){ 74 free(p->q[0]); 75 free(p); 76 exhausted("memory"); 77 } 78 79 lock(&pipealloc.lk); 80 p->path = ++pipealloc.path; 81 unlock(&pipealloc.lk); 82 83 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR); 84 c->aux = p; 85 c->dev = 0; 86 return c; 87 } 88 89 static int 90 pipegen(Chan *c, char *name, Dirtab *tab, int ntab, int i, Dir *dp) 91 { 92 Qid q; 93 int len; 94 Pipe *p; 95 96 USED(name); 97 98 if(i == DEVDOTDOT){ 99 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp); 100 return 1; 101 } 102 i++; /* skip . */ 103 if(tab==0 || i>=ntab) 104 return -1; 105 106 tab += i; 107 p = c->aux; 108 switch((ulong)tab->qid.path){ 109 case Qdata0: 110 len = qlen(p->q[0]); 111 break; 112 case Qdata1: 113 len = qlen(p->q[1]); 114 break; 115 default: 116 len = tab->length; 117 break; 118 } 119 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE); 120 devdir(c, q, tab->name, len, eve, tab->perm, dp); 121 return 1; 122 } 123 124 125 static Walkqid* 126 pipewalk(Chan *c, Chan *nc, char **name, int nname) 127 { 128 Walkqid *wq; 129 Pipe *p; 130 131 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen); 132 if(wq != nil && wq->clone != nil && wq->clone != c){ 133 p = c->aux; 134 qlock(&p->lk); 135 p->ref++; 136 if(c->flag & COPEN){ 137 print("channel open in pipewalk\n"); 138 switch(NETTYPE(c->qid.path)){ 139 case Qdata0: 140 p->qref[0]++; 141 break; 142 case Qdata1: 143 p->qref[1]++; 144 break; 145 } 146 } 147 qunlock(&p->lk); 148 } 149 return wq; 150 } 151 152 static int 153 pipestat(Chan *c, uchar *db, int n) 154 { 155 Pipe *p; 156 Dir dir; 157 158 p = c->aux; 159 160 switch(NETTYPE(c->qid.path)){ 161 case Qdir: 162 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir); 163 break; 164 case Qdata0: 165 devdir(c, c->qid, "data", qlen(p->q[0]), eve, 0600, &dir); 166 break; 167 case Qdata1: 168 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, 0600, &dir); 169 break; 170 default: 171 panic("pipestat"); 172 } 173 n = convD2M(&dir, db, n); 174 if(n < BIT16SZ) 175 error(Eshortstat); 176 return n; 177 } 178 179 /* 180 * if the stream doesn't exist, create it 181 */ 182 static Chan* 183 pipeopen(Chan *c, int omode) 184 { 185 Pipe *p; 186 187 if(c->qid.type & QTDIR){ 188 if(omode != OREAD) 189 error(Ebadarg); 190 c->mode = omode; 191 c->flag |= COPEN; 192 c->offset = 0; 193 return c; 194 } 195 196 p = c->aux; 197 qlock(&p->lk); 198 switch(NETTYPE(c->qid.path)){ 199 case Qdata0: 200 p->qref[0]++; 201 break; 202 case Qdata1: 203 p->qref[1]++; 204 break; 205 } 206 qunlock(&p->lk); 207 208 c->mode = openmode(omode); 209 c->flag |= COPEN; 210 c->offset = 0; 211 c->iounit = qiomaxatomic; 212 return c; 213 } 214 215 static void 216 pipeclose(Chan *c) 217 { 218 Pipe *p; 219 220 p = c->aux; 221 qlock(&p->lk); 222 223 if(c->flag & COPEN){ 224 /* 225 * closing either side hangs up the stream 226 */ 227 switch(NETTYPE(c->qid.path)){ 228 case Qdata0: 229 p->qref[0]--; 230 if(p->qref[0] == 0){ 231 qhangup(p->q[1], 0); 232 qclose(p->q[0]); 233 } 234 break; 235 case Qdata1: 236 p->qref[1]--; 237 if(p->qref[1] == 0){ 238 qhangup(p->q[0], 0); 239 qclose(p->q[1]); 240 } 241 break; 242 } 243 } 244 245 246 /* 247 * if both sides are closed, they are reusable 248 */ 249 if(p->qref[0] == 0 && p->qref[1] == 0){ 250 qreopen(p->q[0]); 251 qreopen(p->q[1]); 252 } 253 254 /* 255 * free the structure on last close 256 */ 257 p->ref--; 258 if(p->ref == 0){ 259 qunlock(&p->lk); 260 free(p->q[0]); 261 free(p->q[1]); 262 free(p); 263 } else 264 qunlock(&p->lk); 265 } 266 267 static long 268 piperead(Chan *c, void *va, long n, vlong offset) 269 { 270 Pipe *p; 271 272 USED(offset); 273 274 p = c->aux; 275 276 switch(NETTYPE(c->qid.path)){ 277 case Qdir: 278 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 279 case Qdata0: 280 return qread(p->q[0], va, n); 281 case Qdata1: 282 return qread(p->q[1], va, n); 283 default: 284 panic("piperead"); 285 } 286 return -1; /* not reached */ 287 } 288 289 static Block* 290 pipebread(Chan *c, long n, ulong offset) 291 { 292 Pipe *p; 293 294 p = c->aux; 295 296 switch(NETTYPE(c->qid.path)){ 297 case Qdata0: 298 return qbread(p->q[0], n); 299 case Qdata1: 300 return qbread(p->q[1], n); 301 } 302 303 return devbread(c, n, offset); 304 } 305 306 /* 307 * a write to a closed pipe causes a note to be sent to 308 * the process. 309 */ 310 static long 311 pipewrite(Chan *c, void *va, long n, vlong offset) 312 { 313 Pipe *p; 314 315 USED(offset); 316 if(!islo()) 317 print("pipewrite hi %lux\n", getcallerpc(&c)); 318 319 if(waserror()) { 320 /* avoid notes when pipe is a mounted queue */ 321 if((c->flag & CMSG) == 0) 322 postnote(up, 1, "sys: write on closed pipe", NUser); 323 nexterror(); 324 } 325 326 p = c->aux; 327 328 switch(NETTYPE(c->qid.path)){ 329 case Qdata0: 330 n = qwrite(p->q[1], va, n); 331 break; 332 333 case Qdata1: 334 n = qwrite(p->q[0], va, n); 335 break; 336 337 default: 338 panic("pipewrite"); 339 } 340 341 poperror(); 342 return n; 343 } 344 345 static long 346 pipebwrite(Chan *c, Block *bp, ulong offset) 347 { 348 long n; 349 Pipe *p; 350 351 USED(offset); 352 353 if(waserror()) { 354 /* avoid notes when pipe is a mounted queue */ 355 if((c->flag & CMSG) == 0) 356 postnote(up, 1, "sys: write on closed pipe", NUser); 357 nexterror(); 358 } 359 360 p = c->aux; 361 switch(NETTYPE(c->qid.path)){ 362 case Qdata0: 363 n = qbwrite(p->q[1], bp); 364 break; 365 366 case Qdata1: 367 n = qbwrite(p->q[0], bp); 368 break; 369 370 default: 371 n = 0; 372 panic("pipebwrite"); 373 } 374 375 poperror(); 376 return n; 377 } 378 379 Dev pipedevtab = { 380 '|', 381 "pipe", 382 383 devreset, 384 pipeinit, 385 devshutdown, 386 pipeattach, 387 pipewalk, 388 pipestat, 389 pipeopen, 390 devcreate, 391 pipeclose, 392 piperead, 393 pipebread, 394 pipewrite, 395 pipebwrite, 396 devremove, 397 devwstat, 398 }; 399