xref: /plan9-contrib/sys/src/ape/lib/ap/plan9/_buf.c (revision 219b2ee8daee37f4aad58d63f21287faa8e4ffdc)
1 #define  _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <select.h>
13 #include "sys9.h"
14 
15 typedef struct Muxseg {
16 	Lock		lock;				/* for mutual exclusion access to buffer variables */
17 	int		curfds;			/* number of fds currently buffered */
18 	int		selwait;			/* true if selecting process is waiting */
19 	int		waittime;			/* time for timer process to wait */
20 	fd_set	rwant;			/* fd's that select wants to read */
21 	fd_set	ewant;			/* fd's that select wants to know eof info on */
22 	Muxbuf	bufs[INITBUFS];	/* can grow, via segbrk() */
23 } Muxseg;
24 
25 #define MUXADDR ((void*)0x6000000)
26 static Muxseg *mux = 0;			/* shared memory segment */
27 
28 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
29 int _muxsid = -1;			/* group id of copy processes */
30 static int _mainpid = -1;
31 static int timerpid = -1;			/* pid of a timer process */
32 
33 void _killmuxsid(void);
34 static void _copyproc(int, Muxbuf*);
35 static void _timerproc(void);
36 static void _resettimer(void);
37 
38 static int copynotehandler(void *, char *);
39 
40 /* assume FD_SETSIZE is 96 */
41 #define FD_ANYSET(p)	((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
42 
43 
44 /*
45  * Start making fd read-buffered: make the shared segment, if necessary,
46  * allocate a slot (index into mux->bufs), and fork a child to read the fd
47  * and write into the slot-indexed buffer.
48  * Return -1 if we can't do it.
49  */
50 int
51 _startbuf(int fd)
52 {
53 	long i, n, slot;
54 	int pid, sid;
55 	Fdinfo *f;
56 	Muxbuf *b;
57 
58 	if(mux == 0){
59 		_RFORK(RFREND);
60 		mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61 		if((long)mux == -1){
62 			_syserrno();
63 			return -1;
64 		}
65 		/* segattach has returned zeroed memory */
66 		lockinit();
67 		atexit(_killmuxsid);
68 	}
69 
70 	slot = mux->curfds++;
71 	if(mux->curfds > INITBUFS) {
72 		if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
73 			_syserrno();
74 			return -1;
75 		}
76 	}
77 
78 	f = &_fdinfo[fd];
79 	b = &mux->bufs[slot];
80 	b->n = 0;
81 	b->putnext = b->data;
82 	b->getnext = b->data;
83 	b->eof = 0;
84 	b->fd = fd;
85 	if(_mainpid == -1)
86 		_mainpid = getpid();
87 	if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
88 		/* copy process ... */
89 		if(_muxsid == -1) {
90 			_RFORK(RFNOTEG);
91 			_muxsid = getpgrp();
92 		} else
93 			setpgid(getpid(), _muxsid);
94 		_NOTIFY(copynotehandler);
95 		for(i=0; i<OPEN_MAX; i++)
96 			if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
97 				_CLOSE(i);
98 		_RENDEZVOUS(0, _muxsid);
99 		_copyproc(fd, b);
100 	}
101 
102 	/* parent process continues ... */
103 	b->copypid = pid;
104 	f->buf = b;
105 	f->flags |= FD_BUFFERED;
106 	_muxsid = _RENDEZVOUS(0, 0);
107 	/* leave fd open in parent so system doesn't reuse it */
108 	return 0;
109 }
110 
111 /*
112  * The given buffered fd is being closed.
113  * Set the fd field in the shared buffer to -1 to tell copyproc
114  * to exit, and kill the copyproc.
115  */
116 void
117 _closebuf(int fd)
118 {
119 	Muxbuf *b;
120 
121 	b = _fdinfo[fd].buf;
122 	if(!b)
123 		return;
124 	lock(&mux->lock);
125 	b->fd = -1;
126 	unlock(&mux->lock);
127 	kill(b->copypid, SIGKILL);
128 }
129 
130 /* child copy procs execute this until eof */
131 static void
132 _copyproc(int fd, Muxbuf *b)
133 {
134 	unsigned char *e;
135 	int n;
136 	int nzeros;
137 
138 	e = &b->data[PERFDMAX];
139 	for(;;) {
140 		/* make sure there's room */
141 		lock(&mux->lock);
142 		if(e - b->putnext < READMAX) {
143 			if(b->getnext == b->putnext) {
144 				b->getnext = b->putnext = b->data;
145 				unlock(&mux->lock);
146 			} else {
147 				/* sleep until there's room */
148 				b->roomwait = 1;
149 				unlock(&mux->lock);
150 				_RENDEZVOUS((unsigned long)&b->roomwait, 0);
151 			}
152 		} else
153 			unlock(&mux->lock);
154 		/*
155 		 * A Zero-length _READ might mean a zero-length write
156 		 * happened, or it might mean eof; try several times to
157 		 * disambiguate (posix read() discards 0-length messages)
158 		 */
159 		nzeros = 0;
160 		do {
161 			n = _READ(fd, b->putnext, READMAX);
162 			if(b->fd == -1) {
163 				_exit(0);		/* we've been closed */
164 			}
165 		} while(n == 0 && ++nzeros < 3);
166 		lock(&mux->lock);
167 		if(n <= 0) {
168 			b->eof = 1;
169 			if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
170 				mux->selwait = 0;
171 				unlock(&mux->lock);
172 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
173 			} else if(b->datawait) {
174 				b->datawait = 0;
175 				unlock(&mux->lock);
176 				_RENDEZVOUS((unsigned long)&b->datawait, 0);
177 			} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
178 				mux->selwait = 0;
179 				unlock(&mux->lock);
180 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
181 			} else
182 				unlock(&mux->lock);
183 			_exit(0);
184 		} else {
185 			b->putnext += n;
186 			b->n += n;
187 			if(b->n > 0) {
188 				/* parent process cannot be both in datawait and selwait */
189 				if(b->datawait) {
190 					b->datawait = 0;
191 					unlock(&mux->lock);
192 					/* wake up _bufreading process */
193 					_RENDEZVOUS((unsigned long)&b->datawait, 0);
194 				} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
195 					mux->selwait = 0;
196 					unlock(&mux->lock);
197 					/* wake up selecting process */
198 					_RENDEZVOUS((unsigned long)&mux->selwait, fd);
199 				} else
200 					unlock(&mux->lock);
201 			} else
202 				unlock(&mux->lock);
203 		}
204 	}
205 }
206 
207 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
208 int
209 _readbuf(int fd, void *addr, int nwant, int noblock)
210 {
211 	Muxbuf *b;
212 	int ngot;
213 
214 	b = _fdinfo[fd].buf;
215 	if(b->eof && b->n == 0) {
216 goteof:
217 		return 0;
218 	}
219 	if(b->n == 0 && noblock) {
220 		errno = EAGAIN;
221 		return -1;
222 	}
223 	/* make sure there's data */
224 	lock(&mux->lock);
225 	ngot = b->putnext - b->getnext;
226 	if(ngot == 0) {
227 		/* maybe EOF just happened */
228 		if(b->eof) {
229 			unlock(&mux->lock);
230 			goto goteof;
231 		}
232 		/* sleep until there's data */
233 		b->datawait = 1;
234 		unlock(&mux->lock);
235 		_RENDEZVOUS((unsigned long)&b->datawait, 0);
236 		lock(&mux->lock);
237 		ngot = b->putnext - b->getnext;
238 	}
239 	if(ngot == 0) {
240 		unlock(&mux->lock);
241 		goto goteof;
242 	}
243 	if(ngot > nwant)
244 		ngot = nwant;
245 	memcpy(addr, b->getnext, ngot);
246 	b->getnext += ngot;
247 	b->n -= ngot;
248 	if(b->getnext == b->putnext && b->roomwait) {
249 		b->getnext = b->putnext = b->data;
250 		b->roomwait = 0;
251 		unlock(&mux->lock);
252 		/* wake up copy process */
253 		_RENDEZVOUS((unsigned long)&b->roomwait, 0);
254 	} else
255 		unlock(&mux->lock);
256 	return ngot;
257 }
258 
259 int
260 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
261 {
262 	int n, i, tmp, t, slots, fd;
263 	Fdinfo *f;
264 	Muxbuf *b;
265 
266 	if(timeout)
267 		t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
268 	else
269 		t = -1;
270 	if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
271 			|| (efds && FD_ANYSET(efds)))) {
272 		/* no requested fds */
273 		if(t > 0)
274 			_SLEEP(t);
275 		return 0;
276 	}
277 
278 	/* make sure all requested rfds and efds are buffered */
279 	if(nfds >= OPEN_MAX)
280 		nfds = OPEN_MAX-1;
281 	for(i = 0; i<= nfds; i++)
282 		if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
283 			f = &_fdinfo[i];
284 			if(!(f->flags&FD_BUFFERED))
285 				if(_startbuf(i) != 0) {
286 					return -1;
287 				}
288 			b = f->buf;
289 			if(rfds && FD_ISSET(i,rfds) && b->eof && b->n == 0) {
290 				errno = EBADF;		/* how X tells a client is gone */
291 				return -1;
292 			}
293 		}
294 
295 	/* check wfds;  for now, we'll say they are all ready */
296 	n = 0;
297 	if(wfds && FD_ANYSET(wfds)){
298 		for(i = 0; i<nfds; i++)
299 			if(FD_ISSET(i, wfds)) {
300 				n++;
301 			}
302 	}
303 
304 	lock(&mux->lock);
305 
306 	slots = mux->curfds;
307 	FD_ZERO(&mux->rwant);
308 	FD_ZERO(&mux->ewant);
309 
310 	for(i = 0; i<slots; i++) {
311 		b = &mux->bufs[i];
312 		fd = b->fd;
313 		if(fd == -1)
314 			continue;
315 		if(rfds && FD_ISSET(fd, rfds)) {
316 			if(b->n > 0 || b->eof)
317 				n++;
318 			else{
319 				FD_CLR(fd, rfds);
320 				FD_SET(fd, &mux->rwant);
321 			}
322 		}
323 		if(efds && FD_ISSET(fd, efds)) {
324 			if(b->eof && b->n == 0)
325 				n++;
326 			else{
327 				FD_CLR(fd, efds);
328 				FD_SET(fd, &mux->ewant);
329 			}
330 		}
331 	}
332 	if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
333 		FD_ZERO(&mux->rwant);
334 		FD_ZERO(&mux->ewant);
335 		unlock(&mux->lock);
336 		return n;
337 	}
338 
339 	if(timeout) {
340 		mux->waittime = t;
341 		if(timerpid == -1)
342 			_timerproc();
343 		else
344 			_resettimer();
345 	}
346 	mux->selwait = 1;
347 	unlock(&mux->lock);
348 	fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
349 	if(fd >= 0) {
350 		b = _fdinfo[fd].buf;
351 		if(FD_ISSET(fd, &mux->rwant)) {
352 			FD_SET(fd, rfds);
353 			n = 1;
354 		} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
355 			FD_SET(fd, efds);
356 			n = 1;
357 		}
358 	}
359 	FD_ZERO(&mux->rwant);
360 	FD_ZERO(&mux->ewant);
361 	return n;
362 }
363 
364 static int timerreset;
365 
366 static void
367 alarmed(int v)
368 {
369 	timerreset = 1;
370 }
371 
372 /* a little over an hour */
373 #define LONGWAIT 4000001
374 
375 static void
376 _timerproc(void)
377 {
378 	int i;
379 
380 	if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
381 		/* timer process */
382 		setpgid(getpid(), _muxsid);
383 		signal(SIGALRM, alarmed);
384 		for(i=0; i<OPEN_MAX; i++)
385 				_CLOSE(i);
386 		_RENDEZVOUS(1, 0);
387 		for(;;) {
388 			_SLEEP(mux->waittime);
389 			if(timerreset) {
390 				timerreset = 0;
391 			} else {
392 				lock(&mux->lock);
393 				if(mux->selwait && mux->waittime != LONGWAIT) {
394 					mux->selwait = 0;
395 					mux->waittime = LONGWAIT;
396 					unlock(&mux->lock);
397 					_RENDEZVOUS((unsigned long)&mux->selwait, -2);
398 				} else {
399 					mux->waittime = LONGWAIT;
400 					unlock(&mux->lock);
401 				}
402 			}
403 		}
404 	}
405 	/* parent process continues */
406 	_RENDEZVOUS(1, 0);
407 }
408 
409 static void
410 _resettimer(void)
411 {
412 	kill(timerpid, SIGALRM);
413 }
414 
415 void
416 _killmuxsid(void)
417 {
418 	if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
419 		kill(-_muxsid,SIGTERM);
420 }
421 
422 /* call this on fork(), because reading a BUFFERED fd won't work in child */
423 void
424 _detachbuf(void)
425 {
426 	int i;
427 	Fdinfo *f;
428 
429 	if(mux == 0)
430 		return;
431 	_SEGDETACH(mux);
432 	for(i = 0; i < OPEN_MAX; i++){
433 		f = &_fdinfo[i];
434 		if(f->flags&FD_BUFFERED)
435 			f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
436 				/* mark 'poisoned' */
437 	}
438 	mux = 0;
439 	_muxsid = -1;
440 	_mainpid = -1;
441 	timerpid = -1;
442 }
443 
444 static int
445 copynotehandler(void *u, char *msg)
446 {
447 	int i;
448 	void(*f)(int);
449 
450 	if(_finishing)
451 		_finish(0, 0);
452 	_NOTED(1);
453 }
454