1 #include "u.h" 2 #include "../port/lib.h" 3 #include "mem.h" 4 #include "dat.h" 5 #include "fns.h" 6 #include "../port/error.h" 7 8 #include "devtab.h" 9 10 typedef struct Pipe Pipe; 11 struct Pipe 12 { 13 Ref; 14 QLock; 15 Pipe *next; 16 ulong path; 17 }; 18 19 struct 20 { 21 Lock; 22 Pipe *pipe; 23 ulong path; 24 } pipealloc; 25 26 static Pipe *getpipe(ulong); 27 static void pipeiput(Queue*, Block*); 28 static void pipeoput(Queue*, Block*); 29 static void pipestclose(Queue *); 30 31 Qinfo pipeinfo = 32 { 33 pipeiput, 34 pipeoput, 35 0, 36 pipestclose, 37 "pipe" 38 }; 39 40 Dirtab pipedir[] = 41 { 42 "data", {Sdataqid}, 0, 0600, 43 "ctl", {Sctlqid}, 0, 0600, 44 "data1", {Sdataqid}, 0, 0600, 45 "ctl1", {Sctlqid}, 0, 0600, 46 }; 47 #define NPIPEDIR 4 48 49 void 50 pipeinit(void) 51 { 52 } 53 54 void 55 pipereset(void) 56 { 57 } 58 59 /* 60 * create a pipe, no streams are created until an open 61 */ 62 Chan* 63 pipeattach(char *spec) 64 { 65 Pipe *p; 66 Chan *c; 67 68 c = devattach('|', spec); 69 p = smalloc(sizeof(Pipe)); 70 p->ref = 1; 71 72 lock(&pipealloc); 73 p->path = ++pipealloc.path; 74 p->next = pipealloc.pipe; 75 pipealloc.pipe = p; 76 unlock(&pipealloc); 77 78 c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0}; 79 c->dev = 0; 80 return c; 81 } 82 83 Chan* 84 pipeclone(Chan *c, Chan *nc) 85 { 86 Pipe *p; 87 88 p = getpipe(STREAMID(c->qid.path)/2); 89 nc = devclone(c, nc); 90 if(incref(p) <= 1) 91 panic("pipeclone"); 92 return nc; 93 } 94 95 int 96 pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp) 97 { 98 int id; 99 100 id = STREAMID(c->qid.path); 101 if(i > 1) 102 id++; 103 if(tab==0 || i>=ntab) 104 return -1; 105 tab += i; 106 devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp); 107 return 1; 108 } 109 110 111 int 112 pipewalk(Chan *c, char *name) 113 { 114 return devwalk(c, name, pipedir, NPIPEDIR, pipegen); 115 } 116 117 void 118 pipestat(Chan *c, char *db) 119 { 120 streamstat(c, db, "pipe", 0666); 121 } 122 123 /* 124 * if the stream doesn't exist, create it 125 */ 126 Chan * 127 pipeopen(Chan *c, int omode) 128 { 129 Pipe *p; 130 int other; 131 Stream *local, *remote; 132 133 if(c->qid.path & CHDIR){ 134 if(omode != OREAD) 135 error(Ebadarg); 136 c->mode = omode; 137 c->flag |= COPEN; 138 c->offset = 0; 139 return c; 140 } 141 142 p = getpipe(STREAMID(c->qid.path)/2); 143 if(waserror()){ 144 qunlock(p); 145 nexterror(); 146 } 147 qlock(p); 148 streamopen(c, &pipeinfo); 149 local = c->stream; 150 if(local->devq->ptr == 0){ 151 /* 152 * first open, create the other end also 153 */ 154 other = STREAMID(c->qid.path)^1; 155 remote = streamnew(c->type, c->dev, other, &pipeinfo,1); 156 157 /* 158 * connect the device ends of both streams 159 */ 160 local->devq->ptr = remote; 161 remote->devq->ptr = local; 162 local->devq->other->next = remote->devq; 163 remote->devq->other->next = local->devq; 164 } else if(local->opens == 1){ 165 /* 166 * keep other side around till last close of this side 167 */ 168 streamenter(local->devq->ptr); 169 } 170 qunlock(p); 171 poperror(); 172 173 c->mode = openmode(omode); 174 c->flag |= COPEN; 175 c->offset = 0; 176 return c; 177 } 178 179 void 180 pipecreate(Chan *c, char *name, int omode, ulong perm) 181 { 182 USED(c, name, omode, perm); 183 error(Egreg); 184 } 185 186 void 187 piperemove(Chan *c) 188 { 189 USED(c); 190 error(Egreg); 191 } 192 193 void 194 pipewstat(Chan *c, char *db) 195 { 196 USED(c, db); 197 error(Eperm); 198 } 199 200 void 201 pipeclose(Chan *c) 202 { 203 Pipe *p, *f, **l; 204 Stream *remote; 205 206 p = getpipe(STREAMID(c->qid.path)/2); 207 208 /* 209 * take care of local and remote streams 210 */ 211 if(c->stream){ 212 qlock(p); 213 remote = c->stream->devq->ptr; 214 if(streamclose(c) == 0){ 215 if(remote) 216 streamexit(remote); 217 } 218 qunlock(p); 219 } 220 221 /* 222 * free the structure 223 */ 224 if(decref(p) == 0){ 225 lock(&pipealloc); 226 l = &pipealloc.pipe; 227 for(f = *l; f; f = f->next) { 228 if(f == p) { 229 *l = p->next; 230 break; 231 } 232 l = &f->next; 233 } 234 unlock(&pipealloc); 235 free(p); 236 } 237 } 238 239 long 240 piperead(Chan *c, void *va, long n, ulong offset) 241 { 242 USED(offset); 243 if(c->qid.path & CHDIR) 244 return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 245 246 return streamread(c, va, n); 247 } 248 249 /* 250 * a write to a closed pipe causes a note to be sent to 251 * the process. 252 */ 253 long 254 pipewrite(Chan *c, void *va, long n, ulong offset) 255 { 256 USED(offset); 257 258 /* avoid notes when pipe is a mounted stream */ 259 if(c->flag & CMSG) 260 return streamwrite(c, va, n, 0); 261 262 if(waserror()) { 263 postnote(u->p, 1, "sys: write on closed pipe", NUser); 264 error(Ehungup); 265 } 266 n = streamwrite(c, va, n, 0); 267 poperror(); 268 return n; 269 } 270 271 /* 272 * send a block upstream to the process. 273 * sleep until there's room upstream. 274 */ 275 static void 276 pipeiput(Queue *q, Block *bp) 277 { 278 FLOWCTL(q, bp); 279 } 280 281 /* 282 * send the block to the other side 283 */ 284 static void 285 pipeoput(Queue *q, Block *bp) 286 { 287 PUTNEXT(q, bp); 288 } 289 290 /* 291 * send a hangup and disconnect the streams 292 */ 293 static void 294 pipestclose(Queue *q) 295 { 296 Block *bp; 297 298 /* 299 * point to the bit-bucket and let any in-progress 300 * write's finish. 301 */ 302 q->put = nullput; 303 wakeup(&q->r); 304 305 /* 306 * send a hangup 307 */ 308 q = q->other; 309 if(q->next == 0) 310 return; 311 bp = allocb(0); 312 bp->type = M_HANGUP; 313 PUTNEXT(q, bp); 314 } 315 316 Pipe* 317 getpipe(ulong path) 318 { 319 Pipe *p; 320 321 lock(&pipealloc); 322 for(p = pipealloc.pipe; p; p = p->next) { 323 if(path == p->path) { 324 unlock(&pipealloc); 325 return p; 326 } 327 } 328 unlock(&pipealloc); 329 panic("getpipe"); 330 return 0; /* not reached */ 331 } 332