xref: /plan9/sys/src/9/port/devpipe.c (revision 219b2ee8daee37f4aad58d63f21287faa8e4ffdc)
1 #include	"u.h"
2 #include	"../port/lib.h"
3 #include	"mem.h"
4 #include	"dat.h"
5 #include	"fns.h"
6 #include	"../port/error.h"
7 
8 #include	"devtab.h"
9 
10 typedef struct Pipe	Pipe;
11 struct Pipe
12 {
13 	Ref;
14 	QLock;
15 	Pipe	*next;
16 	ulong	path;
17 };
18 
19 struct
20 {
21 	Lock;
22 	Pipe	*pipe;
23 	ulong	path;
24 } pipealloc;
25 
26 static Pipe *getpipe(ulong);
27 static void pipeiput(Queue*, Block*);
28 static void pipeoput(Queue*, Block*);
29 static void pipestclose(Queue *);
30 
31 Qinfo pipeinfo =
32 {
33 	pipeiput,
34 	pipeoput,
35 	0,
36 	pipestclose,
37 	"pipe"
38 };
39 
40 Dirtab pipedir[] =
41 {
42 	"data",		{Sdataqid},	0,			0600,
43 	"ctl",		{Sctlqid},	0,			0600,
44 	"data1",	{Sdataqid},	0,			0600,
45 	"ctl1",		{Sctlqid},	0,			0600,
46 };
47 #define NPIPEDIR 4
48 
49 void
50 pipeinit(void)
51 {
52 }
53 
54 void
55 pipereset(void)
56 {
57 }
58 
59 /*
60  *  create a pipe, no streams are created until an open
61  */
62 Chan*
63 pipeattach(char *spec)
64 {
65 	Pipe *p;
66 	Chan *c;
67 
68 	c = devattach('|', spec);
69 	p = smalloc(sizeof(Pipe));
70 	p->ref = 1;
71 
72 	lock(&pipealloc);
73 	p->path = ++pipealloc.path;
74 	p->next = pipealloc.pipe;
75 	pipealloc.pipe = p;
76 	unlock(&pipealloc);
77 
78 	c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0};
79 	c->dev = 0;
80 	return c;
81 }
82 
83 Chan*
84 pipeclone(Chan *c, Chan *nc)
85 {
86 	Pipe *p;
87 
88 	p = getpipe(STREAMID(c->qid.path)/2);
89 	nc = devclone(c, nc);
90 	if(incref(p) <= 1)
91 		panic("pipeclone");
92 	return nc;
93 }
94 
95 int
96 pipegen(Chan *c, Dirtab *tab, int ntab, int i, Dir *dp)
97 {
98 	int id;
99 
100 	id = STREAMID(c->qid.path);
101 	if(i > 1)
102 		id++;
103 	if(tab==0 || i>=ntab)
104 		return -1;
105 	tab += i;
106 	devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp);
107 	return 1;
108 }
109 
110 
111 int
112 pipewalk(Chan *c, char *name)
113 {
114 	return devwalk(c, name, pipedir, NPIPEDIR, pipegen);
115 }
116 
117 void
118 pipestat(Chan *c, char *db)
119 {
120 	streamstat(c, db, "pipe", 0666);
121 }
122 
123 /*
124  *  if the stream doesn't exist, create it
125  */
126 Chan *
127 pipeopen(Chan *c, int omode)
128 {
129 	Pipe *p;
130 	int other;
131 	Stream *local, *remote;
132 
133 	if(c->qid.path & CHDIR){
134 		if(omode != OREAD)
135 			error(Ebadarg);
136 		c->mode = omode;
137 		c->flag |= COPEN;
138 		c->offset = 0;
139 		return c;
140 	}
141 
142 	p = getpipe(STREAMID(c->qid.path)/2);
143 	if(waserror()){
144 		qunlock(p);
145 		nexterror();
146 	}
147 	qlock(p);
148 	streamopen(c, &pipeinfo);
149 	local = c->stream;
150 	if(local->devq->ptr == 0){
151 		/*
152 		 *  first open, create the other end also
153 		 */
154 		other = STREAMID(c->qid.path)^1;
155 		remote = streamnew(c->type, c->dev, other, &pipeinfo,1);
156 
157 		/*
158 		 *  connect the device ends of both streams
159 		 */
160 		local->devq->ptr = remote;
161 		remote->devq->ptr = local;
162 		local->devq->other->next = remote->devq;
163 		remote->devq->other->next = local->devq;
164 	} else if(local->opens == 1){
165 		/*
166 		 *  keep other side around till last close of this side
167 		 */
168 		streamenter(local->devq->ptr);
169 	}
170 	qunlock(p);
171 	poperror();
172 
173 	c->mode = openmode(omode);
174 	c->flag |= COPEN;
175 	c->offset = 0;
176 	return c;
177 }
178 
179 void
180 pipecreate(Chan *c, char *name, int omode, ulong perm)
181 {
182 	USED(c, name, omode, perm);
183 	error(Egreg);
184 }
185 
186 void
187 piperemove(Chan *c)
188 {
189 	USED(c);
190 	error(Egreg);
191 }
192 
193 void
194 pipewstat(Chan *c, char *db)
195 {
196 	USED(c, db);
197 	error(Eperm);
198 }
199 
200 void
201 pipeclose(Chan *c)
202 {
203 	Pipe *p, *f, **l;
204 	Stream *remote;
205 
206 	p = getpipe(STREAMID(c->qid.path)/2);
207 
208 	/*
209 	 *  take care of local and remote streams
210 	 */
211 	if(c->stream){
212 		qlock(p);
213 		remote = c->stream->devq->ptr;
214 		if(streamclose(c) == 0){
215 			if(remote)
216 				streamexit(remote);
217 		}
218 		qunlock(p);
219 	}
220 
221 	/*
222 	 *  free the structure
223 	 */
224 	if(decref(p) == 0){
225 		lock(&pipealloc);
226 		l = &pipealloc.pipe;
227 		for(f = *l; f; f = f->next) {
228 			if(f == p) {
229 				*l = p->next;
230 				break;
231 			}
232 			l = &f->next;
233 		}
234 		unlock(&pipealloc);
235 		free(p);
236 	}
237 }
238 
239 long
240 piperead(Chan *c, void *va, long n, ulong offset)
241 {
242 	USED(offset);
243 	if(c->qid.path & CHDIR)
244 		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen);
245 
246 	return streamread(c, va, n);
247 }
248 
249 /*
250  *  a write to a closed pipe causes a note to be sent to
251  *  the process.
252  */
253 long
254 pipewrite(Chan *c, void *va, long n, ulong offset)
255 {
256 	USED(offset);
257 
258 	/* avoid notes when pipe is a mounted stream */
259 	if(c->flag & CMSG)
260 		return streamwrite(c, va, n, 0);
261 
262 	if(waserror()) {
263 		postnote(u->p, 1, "sys: write on closed pipe", NUser);
264 		error(Ehungup);
265 	}
266 	n = streamwrite(c, va, n, 0);
267 	poperror();
268 	return n;
269 }
270 
271 /*
272  *  send a block upstream to the process.
273  *  sleep until there's room upstream.
274  */
275 static void
276 pipeiput(Queue *q, Block *bp)
277 {
278 	FLOWCTL(q, bp);
279 }
280 
281 /*
282  *  send the block to the other side
283  */
284 static void
285 pipeoput(Queue *q, Block *bp)
286 {
287 	PUTNEXT(q, bp);
288 }
289 
290 /*
291  *  send a hangup and disconnect the streams
292  */
293 static void
294 pipestclose(Queue *q)
295 {
296 	Block *bp;
297 
298 	/*
299 	 *  point to the bit-bucket and let any in-progress
300 	 *  write's finish.
301 	 */
302 	q->put = nullput;
303 	wakeup(&q->r);
304 
305 	/*
306 	 *  send a hangup
307 	 */
308 	q = q->other;
309 	if(q->next == 0)
310 		return;
311 	bp = allocb(0);
312 	bp->type = M_HANGUP;
313 	PUTNEXT(q, bp);
314 }
315 
316 Pipe*
317 getpipe(ulong path)
318 {
319 	Pipe *p;
320 
321 	lock(&pipealloc);
322 	for(p = pipealloc.pipe; p; p = p->next) {
323 		if(path == p->path) {
324 			unlock(&pipealloc);
325 			return p;
326 		}
327 	}
328 	unlock(&pipealloc);
329 	panic("getpipe");
330 	return 0;		/* not reached */
331 }
332