1 #include "u.h" 2 #include "../port/lib.h" 3 #include "mem.h" 4 #include "dat.h" 5 #include "fns.h" 6 #include "../port/error.h" 7 8 #include "netif.h" 9 10 typedef struct Pipe Pipe; 11 struct Pipe 12 { 13 QLock; 14 Pipe *next; 15 int ref; 16 ulong path; 17 long perm; 18 Queue *q[2]; 19 int qref[2]; 20 }; 21 22 struct 23 { 24 Lock; 25 ulong path; 26 } pipealloc; 27 28 enum 29 { 30 Qdir, 31 Qdata0, 32 Qdata1, 33 }; 34 35 Dirtab pipedir[] = 36 { 37 ".", {Qdir,0,QTDIR}, 0, DMDIR|0500, 38 "data", {Qdata0}, 0, 0600, 39 "data1", {Qdata1}, 0, 0600, 40 }; 41 #define NPIPEDIR 3 42 43 static void 44 pipeinit(void) 45 { 46 if(conf.pipeqsize == 0){ 47 if(conf.nmach > 1) 48 conf.pipeqsize = 256*1024; 49 else 50 conf.pipeqsize = 32*1024; 51 } 52 } 53 54 /* 55 * create a pipe, no streams are created until an open 56 */ 57 static Chan* 58 pipeattach(char *spec) 59 { 60 Pipe *p; 61 Chan *c; 62 63 c = devattach('|', spec); 64 p = malloc(sizeof(Pipe)); 65 if(p == 0) 66 exhausted("memory"); 67 p->ref = 1; 68 69 p->q[0] = qopen(conf.pipeqsize, 0, 0, 0); 70 if(p->q[0] == 0){ 71 free(p); 72 exhausted("memory"); 73 } 74 p->q[1] = qopen(conf.pipeqsize, 0, 0, 0); 75 if(p->q[1] == 0){ 76 free(p->q[0]); 77 free(p); 78 exhausted("memory"); 79 } 80 81 lock(&pipealloc); 82 p->path = ++pipealloc.path; 83 unlock(&pipealloc); 84 p->perm = pipedir[Qdata0].perm; 85 86 mkqid(&c->qid, NETQID(2*p->path, Qdir), 0, QTDIR); 87 c->aux = p; 88 c->dev = 0; 89 return c; 90 } 91 92 static int 93 pipegen(Chan *c, char*, Dirtab *tab, int ntab, int i, Dir *dp) 94 { 95 Qid q; 96 int len; 97 Pipe *p; 98 99 if(i == DEVDOTDOT){ 100 devdir(c, c->qid, "#|", 0, eve, DMDIR|0555, dp); 101 return 1; 102 } 103 i++; /* skip . */ 104 if(tab==0 || i>=ntab) 105 return -1; 106 107 tab += i; 108 p = c->aux; 109 switch((ulong)tab->qid.path){ 110 case Qdata0: 111 len = qlen(p->q[0]); 112 break; 113 case Qdata1: 114 len = qlen(p->q[1]); 115 break; 116 default: 117 len = tab->length; 118 break; 119 } 120 mkqid(&q, NETQID(NETID(c->qid.path), tab->qid.path), 0, QTFILE); 121 devdir(c, q, tab->name, len, eve, p->perm, dp); 122 return 1; 123 } 124 125 126 static Walkqid* 127 pipewalk(Chan *c, Chan *nc, char **name, int nname) 128 { 129 Walkqid *wq; 130 Pipe *p; 131 132 wq = devwalk(c, nc, name, nname, pipedir, NPIPEDIR, pipegen); 133 if(wq != nil && wq->clone != nil && wq->clone != c){ 134 p = c->aux; 135 qlock(p); 136 p->ref++; 137 if(c->flag & COPEN){ 138 print("channel open in pipewalk\n"); 139 switch(NETTYPE(c->qid.path)){ 140 case Qdata0: 141 p->qref[0]++; 142 break; 143 case Qdata1: 144 p->qref[1]++; 145 break; 146 } 147 } 148 qunlock(p); 149 } 150 return wq; 151 } 152 153 static int 154 pipestat(Chan *c, uchar *db, int n) 155 { 156 Pipe *p; 157 Dir dir; 158 159 p = c->aux; 160 161 switch(NETTYPE(c->qid.path)){ 162 case Qdir: 163 devdir(c, c->qid, ".", 0, eve, DMDIR|0555, &dir); 164 break; 165 case Qdata0: 166 devdir(c, c->qid, "data", qlen(p->q[0]), eve, p->perm, &dir); 167 break; 168 case Qdata1: 169 devdir(c, c->qid, "data1", qlen(p->q[1]), eve, p->perm, &dir); 170 break; 171 default: 172 panic("pipestat"); 173 } 174 n = convD2M(&dir, db, n); 175 if(n < BIT16SZ) 176 error(Eshortstat); 177 return n; 178 } 179 180 static int 181 pipewstat(Chan* c, uchar* db, int n) 182 { 183 int m; 184 Dir *dir; 185 Pipe *p; 186 187 p = c->aux; 188 if(strcmp(up->user, eve) != 0) 189 error(Eperm); 190 if(NETTYPE(c->qid.path) == Qdir) 191 error(Eisdir); 192 193 dir = smalloc(sizeof(Dir)+n); 194 if(waserror()){ 195 free(dir); 196 nexterror(); 197 } 198 m = convM2D(db, n, &dir[0], (char*)&dir[1]); 199 if(m == 0) 200 error(Eshortstat); 201 if(!emptystr(dir[0].uid)) 202 error("can't change owner"); 203 if(dir[0].mode != ~0UL) 204 p->perm = dir[0].mode; 205 poperror(); 206 free(dir); 207 return m; 208 } 209 210 /* 211 * if the stream doesn't exist, create it 212 */ 213 static Chan* 214 pipeopen(Chan *c, int omode) 215 { 216 Pipe *p; 217 218 if(c->qid.type & QTDIR){ 219 if(omode != OREAD) 220 error(Ebadarg); 221 c->mode = omode; 222 c->flag |= COPEN; 223 c->offset = 0; 224 return c; 225 } 226 227 p = c->aux; 228 qlock(p); 229 switch(NETTYPE(c->qid.path)){ 230 case Qdata0: 231 p->qref[0]++; 232 break; 233 case Qdata1: 234 p->qref[1]++; 235 break; 236 } 237 qunlock(p); 238 239 c->mode = openmode(omode); 240 c->flag |= COPEN; 241 c->offset = 0; 242 c->iounit = qiomaxatomic; 243 return c; 244 } 245 246 static void 247 pipeclose(Chan *c) 248 { 249 Pipe *p; 250 251 p = c->aux; 252 qlock(p); 253 254 if(c->flag & COPEN){ 255 /* 256 * closing either side hangs up the stream 257 */ 258 switch(NETTYPE(c->qid.path)){ 259 case Qdata0: 260 p->qref[0]--; 261 if(p->qref[0] == 0){ 262 qhangup(p->q[1], 0); 263 qclose(p->q[0]); 264 } 265 break; 266 case Qdata1: 267 p->qref[1]--; 268 if(p->qref[1] == 0){ 269 qhangup(p->q[0], 0); 270 qclose(p->q[1]); 271 } 272 break; 273 } 274 } 275 276 277 /* 278 * if both sides are closed, they are reusable 279 */ 280 if(p->qref[0] == 0 && p->qref[1] == 0){ 281 qreopen(p->q[0]); 282 qreopen(p->q[1]); 283 } 284 285 /* 286 * free the structure on last close 287 */ 288 p->ref--; 289 if(p->ref == 0){ 290 qunlock(p); 291 free(p->q[0]); 292 free(p->q[1]); 293 free(p); 294 } else 295 qunlock(p); 296 } 297 298 static long 299 piperead(Chan *c, void *va, long n, vlong) 300 { 301 Pipe *p; 302 303 p = c->aux; 304 305 switch(NETTYPE(c->qid.path)){ 306 case Qdir: 307 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 308 case Qdata0: 309 return qread(p->q[0], va, n); 310 case Qdata1: 311 return qread(p->q[1], va, n); 312 default: 313 panic("piperead"); 314 } 315 return -1; /* not reached */ 316 } 317 318 static Block* 319 pipebread(Chan *c, long n, ulong offset) 320 { 321 Pipe *p; 322 323 p = c->aux; 324 325 switch(NETTYPE(c->qid.path)){ 326 case Qdata0: 327 return qbread(p->q[0], n); 328 case Qdata1: 329 return qbread(p->q[1], n); 330 } 331 332 return devbread(c, n, offset); 333 } 334 335 /* 336 * a write to a closed pipe causes a note to be sent to 337 * the process. 338 */ 339 static long 340 pipewrite(Chan *c, void *va, long n, vlong) 341 { 342 Pipe *p; 343 344 if(!islo()) 345 print("pipewrite hi %#p\n", getcallerpc(&c)); 346 if(waserror()) { 347 /* avoid notes when pipe is a mounted queue */ 348 if((c->flag & CMSG) == 0) 349 postnote(up, 1, "sys: write on closed pipe", NUser); 350 nexterror(); 351 } 352 353 p = c->aux; 354 355 switch(NETTYPE(c->qid.path)){ 356 case Qdata0: 357 n = qwrite(p->q[1], va, n); 358 break; 359 360 case Qdata1: 361 n = qwrite(p->q[0], va, n); 362 break; 363 364 default: 365 panic("pipewrite"); 366 } 367 368 poperror(); 369 return n; 370 } 371 372 static long 373 pipebwrite(Chan *c, Block *bp, ulong) 374 { 375 long n; 376 Pipe *p; 377 378 if(waserror()) { 379 /* avoid notes when pipe is a mounted queue */ 380 if((c->flag & CMSG) == 0) 381 postnote(up, 1, "sys: write on closed pipe", NUser); 382 nexterror(); 383 } 384 385 p = c->aux; 386 switch(NETTYPE(c->qid.path)){ 387 case Qdata0: 388 n = qbwrite(p->q[1], bp); 389 break; 390 391 case Qdata1: 392 n = qbwrite(p->q[0], bp); 393 break; 394 395 default: 396 n = 0; 397 panic("pipebwrite"); 398 } 399 400 poperror(); 401 return n; 402 } 403 404 Dev pipedevtab = { 405 '|', 406 "pipe", 407 408 devreset, 409 pipeinit, 410 devshutdown, 411 pipeattach, 412 pipewalk, 413 pipestat, 414 pipeopen, 415 devcreate, 416 pipeclose, 417 piperead, 418 pipebread, 419 pipewrite, 420 pipebwrite, 421 devremove, 422 pipewstat, 423 }; 424