xref: /plan9-contrib/sys/src/ape/lib/ap/plan9/_buf.c (revision a84536681645e23c630ce4ef2e5c3b284d4c590b)
1 #define  _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <sys/select.h>
13 #include <unistd.h>
14 #include "sys9.h"
15 
16 typedef struct Muxseg {
17 	Lock	lock;			/* for mutual exclusion access to buffer variables */
18 	int	curfds;			/* number of fds currently buffered */
19 	int	selwait;		/* true if selecting process is waiting */
20 	int	waittime;		/* time for timer process to wait */
21 	fd_set	rwant;			/* fd's that select wants to read */
22 	fd_set	ewant;			/* fd's that select wants to know eof info on */
23 	Muxbuf	bufs[INITBUFS];		/* can grow, via segbrk() */
24 } Muxseg;
25 
26 #define MUXADDR ((void*)0x6000000)
27 static Muxseg *mux = 0;			/* shared memory segment */
28 
29 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
30 int _muxsid = -1;			/* group id of copy processes */
31 static int _mainpid = -1;
32 static int timerpid = -1;		/* pid of a timer process */
33 
34 void _killmuxsid(void);
35 static void _copyproc(int, Muxbuf*);
36 static void _timerproc(void);
37 static void _resettimer(void);
38 
39 static int copynotehandler(void *, char *);
40 
41 /* assume FD_SETSIZE is 96 */
42 #define FD_ANYSET(p)	((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
43 
44 /*
45  * Start making fd read-buffered: make the shared segment, if necessary,
46  * allocate a slot (index into mux->bufs), and fork a child to read the fd
47  * and write into the slot-indexed buffer.
48  * Return -1 if we can't do it.
49  */
50 int
51 _startbuf(int fd)
52 {
53 	long i, n, slot;
54 	int pid, sid;
55 	Fdinfo *f;
56 	Muxbuf *b;
57 
58 	if(mux == 0){
59 		_RFORK(RFREND);
60 		mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61 		if((long)mux == -1){
62 			_syserrno();
63 			return -1;
64 		}
65 		/* segattach has returned zeroed memory */
66 		atexit(_killmuxsid);
67 	}
68 
69 	if(fd == -1)
70 		return 0;
71 
72 	lock(&mux->lock);
73 	slot = mux->curfds++;
74 	if(mux->curfds > INITBUFS) {
75 		if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
76 			_syserrno();
77 			unlock(&mux->lock);
78 			return -1;
79 		}
80 	}
81 
82 	f = &_fdinfo[fd];
83 	b = &mux->bufs[slot];
84 	b->n = 0;
85 	b->putnext = b->data;
86 	b->getnext = b->data;
87 	b->eof = 0;
88 	b->fd = fd;
89 	if(_mainpid == -1)
90 		_mainpid = getpid();
91 	if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
92 		/* copy process ... */
93 		if(_muxsid == -1) {
94 			_RFORK(RFNOTEG);
95 			_muxsid = getpgrp();
96 		} else
97 			setpgid(getpid(), _muxsid);
98 		_NOTIFY(copynotehandler);
99 		for(i=0; i<OPEN_MAX; i++)
100 			if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
101 				_CLOSE(i);
102 		_RENDEZVOUS(0, _muxsid);
103 		_copyproc(fd, b);
104 	}
105 
106 	/* parent process continues ... */
107 	b->copypid = pid;
108 	f->buf = b;
109 	f->flags |= FD_BUFFERED;
110 	unlock(&mux->lock);
111 	_muxsid = _RENDEZVOUS(0, 0);
112 	/* leave fd open in parent so system doesn't reuse it */
113 	return 0;
114 }
115 
116 /*
117  * The given buffered fd is being closed.
118  * Set the fd field in the shared buffer to -1 to tell copyproc
119  * to exit, and kill the copyproc.
120  */
121 void
122 _closebuf(int fd)
123 {
124 	Muxbuf *b;
125 
126 	b = _fdinfo[fd].buf;
127 	if(!b)
128 		return;
129 	lock(&mux->lock);
130 	b->fd = -1;
131 	unlock(&mux->lock);
132 	kill(b->copypid, SIGKILL);
133 }
134 
135 /* child copy procs execute this until eof */
136 static void
137 _copyproc(int fd, Muxbuf *b)
138 {
139 	unsigned char *e;
140 	int n;
141 	int nzeros;
142 
143 	e = &b->data[PERFDMAX];
144 	for(;;) {
145 		/* make sure there's room */
146 		lock(&mux->lock);
147 		if(e - b->putnext < READMAX) {
148 			if(b->getnext == b->putnext) {
149 				b->getnext = b->putnext = b->data;
150 				unlock(&mux->lock);
151 			} else {
152 				/* sleep until there's room */
153 				b->roomwait = 1;
154 				unlock(&mux->lock);
155 				_RENDEZVOUS((unsigned long)&b->roomwait, 0);
156 			}
157 		} else
158 			unlock(&mux->lock);
159 		/*
160 		 * A Zero-length _READ might mean a zero-length write
161 		 * happened, or it might mean eof; try several times to
162 		 * disambiguate (posix read() discards 0-length messages)
163 		 */
164 		nzeros = 0;
165 		do {
166 			n = _READ(fd, b->putnext, READMAX);
167 			if(b->fd == -1) {
168 				_exit(0);		/* we've been closed */
169 			}
170 		} while(n == 0 && ++nzeros < 3);
171 		lock(&mux->lock);
172 		if(n <= 0) {
173 			b->eof = 1;
174 			if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
175 				mux->selwait = 0;
176 				unlock(&mux->lock);
177 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
178 			} else if(b->datawait) {
179 				b->datawait = 0;
180 				unlock(&mux->lock);
181 				_RENDEZVOUS((unsigned long)&b->datawait, 0);
182 			} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
183 				mux->selwait = 0;
184 				unlock(&mux->lock);
185 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
186 			} else
187 				unlock(&mux->lock);
188 			_exit(0);
189 		} else {
190 			b->putnext += n;
191 			b->n += n;
192 			if(b->n > 0) {
193 				/* parent process cannot be both in datawait and selwait */
194 				if(b->datawait) {
195 					b->datawait = 0;
196 					unlock(&mux->lock);
197 					/* wake up _bufreading process */
198 					_RENDEZVOUS((unsigned long)&b->datawait, 0);
199 				} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
200 					mux->selwait = 0;
201 					unlock(&mux->lock);
202 					/* wake up selecting process */
203 					_RENDEZVOUS((unsigned long)&mux->selwait, fd);
204 				} else
205 					unlock(&mux->lock);
206 			} else
207 				unlock(&mux->lock);
208 		}
209 	}
210 }
211 
212 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
213 int
214 _readbuf(int fd, void *addr, int nwant, int noblock)
215 {
216 	Muxbuf *b;
217 	int ngot;
218 
219 	b = _fdinfo[fd].buf;
220 	if(b->eof && b->n == 0) {
221 goteof:
222 		return 0;
223 	}
224 	if(b->n == 0 && noblock) {
225 		errno = EAGAIN;
226 		return -1;
227 	}
228 	/* make sure there's data */
229 	lock(&mux->lock);
230 	ngot = b->putnext - b->getnext;
231 	if(ngot == 0) {
232 		/* maybe EOF just happened */
233 		if(b->eof) {
234 			unlock(&mux->lock);
235 			goto goteof;
236 		}
237 		/* sleep until there's data */
238 		b->datawait = 1;
239 		unlock(&mux->lock);
240 		_RENDEZVOUS((unsigned long)&b->datawait, 0);
241 		lock(&mux->lock);
242 		ngot = b->putnext - b->getnext;
243 	}
244 	if(ngot == 0) {
245 		unlock(&mux->lock);
246 		goto goteof;
247 	}
248 	if(ngot > nwant)
249 		ngot = nwant;
250 	memcpy(addr, b->getnext, ngot);
251 	b->getnext += ngot;
252 	b->n -= ngot;
253 	if(b->getnext == b->putnext && b->roomwait) {
254 		b->getnext = b->putnext = b->data;
255 		b->roomwait = 0;
256 		unlock(&mux->lock);
257 		/* wake up copy process */
258 		_RENDEZVOUS((unsigned long)&b->roomwait, 0);
259 	} else
260 		unlock(&mux->lock);
261 	return ngot;
262 }
263 
264 int
265 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
266 {
267 	int n, i, tmp, t, slots, fd, err;
268 	Fdinfo *f;
269 	Muxbuf *b;
270 
271 	if(timeout)
272 		t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
273 	else
274 		t = -1;
275 	if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
276 			|| (efds && FD_ANYSET(efds)))) {
277 		/* no requested fds */
278 		if(t > 0)
279 			_SLEEP(t);
280 		return 0;
281 	}
282 
283 	_startbuf(-1);
284 
285 	/* make sure all requested rfds and efds are buffered */
286 	if(nfds >= OPEN_MAX)
287 		nfds = OPEN_MAX;
288 	for(i = 0; i < nfds; i++)
289 		if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
290 			f = &_fdinfo[i];
291 			if(!(f->flags&FD_BUFFERED))
292 				if(_startbuf(i) != 0) {
293 					return -1;
294 				}
295 			b = f->buf;
296 			if(rfds && FD_ISSET(i,rfds) && b->eof && b->n == 0)
297 			if(efds == 0 || !FD_ISSET(i,efds)) {
298 				errno = EBADF;		/* how X tells a client is gone */
299 				return -1;
300 			}
301 		}
302 
303 	/* check wfds;  for now, we'll say they are all ready */
304 	n = 0;
305 	if(wfds && FD_ANYSET(wfds)){
306 		for(i = 0; i<nfds; i++)
307 			if(FD_ISSET(i, wfds)) {
308 				n++;
309 			}
310 	}
311 
312 	lock(&mux->lock);
313 
314 	slots = mux->curfds;
315 	FD_ZERO(&mux->rwant);
316 	FD_ZERO(&mux->ewant);
317 
318 	for(i = 0; i<slots; i++) {
319 		b = &mux->bufs[i];
320 		fd = b->fd;
321 		if(fd == -1)
322 			continue;
323 		err = 0;
324 		if(efds && FD_ISSET(fd, efds)) {
325 			if(b->eof && b->n == 0){
326 				err = 1;
327 				n++;
328 			}else{
329 				FD_CLR(fd, efds);
330 				FD_SET(fd, &mux->ewant);
331 			}
332 		}
333 		if(rfds && FD_ISSET(fd, rfds)) {
334 			if(!err && (b->n > 0 || b->eof))
335 				n++;
336 			else{
337 				FD_CLR(fd, rfds);
338 				FD_SET(fd, &mux->rwant);
339 			}
340 		}
341 	}
342 	if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
343 		FD_ZERO(&mux->rwant);
344 		FD_ZERO(&mux->ewant);
345 		unlock(&mux->lock);
346 		return n;
347 	}
348 
349 	if(timeout) {
350 		mux->waittime = t;
351 		if(timerpid == -1)
352 			_timerproc();
353 		else
354 			_resettimer();
355 	}
356 	mux->selwait = 1;
357 	unlock(&mux->lock);
358 	fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
359 	if(fd >= 0) {
360 		b = _fdinfo[fd].buf;
361 		if(FD_ISSET(fd, &mux->rwant)) {
362 			FD_SET(fd, rfds);
363 			n = 1;
364 		} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
365 			FD_SET(fd, efds);
366 			n = 1;
367 		}
368 	}
369 	FD_ZERO(&mux->rwant);
370 	FD_ZERO(&mux->ewant);
371 	return n;
372 }
373 
374 static int timerreset;
375 static int timerpid;
376 
377 static void
378 alarmed(int v)
379 {
380 	timerreset = 1;
381 }
382 
383 /* a little over an hour */
384 #define LONGWAIT 4000001
385 
386 static void
387 _killtimerproc(void)
388 {
389 	if(timerpid > 0)
390 		kill(timerpid, SIGKILL);
391 }
392 
393 static void
394 _timerproc(void)
395 {
396 	int i;
397 
398 	if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
399 		/* timer process */
400 		setpgid(getpid(), _muxsid);
401 		signal(SIGALRM, alarmed);
402 		for(i=0; i<OPEN_MAX; i++)
403 				_CLOSE(i);
404 		_RENDEZVOUS(1, 0);
405 		for(;;) {
406 			_SLEEP(mux->waittime);
407 			if(timerreset) {
408 				timerreset = 0;
409 			} else {
410 				lock(&mux->lock);
411 				if(mux->selwait && mux->waittime != LONGWAIT) {
412 					mux->selwait = 0;
413 					mux->waittime = LONGWAIT;
414 					unlock(&mux->lock);
415 					_RENDEZVOUS((unsigned long)&mux->selwait, -2);
416 				} else {
417 					mux->waittime = LONGWAIT;
418 					unlock(&mux->lock);
419 				}
420 			}
421 		}
422 	}
423 	atexit(_killtimerproc);
424 	/* parent process continues */
425 	_RENDEZVOUS(1, 0);
426 }
427 
428 static void
429 _resettimer(void)
430 {
431 	kill(timerpid, SIGALRM);
432 }
433 
434 void
435 _killmuxsid(void)
436 {
437 	if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
438 		kill(-_muxsid,SIGTERM);
439 }
440 
441 /* call this on fork(), because reading a BUFFERED fd won't work in child */
442 void
443 _detachbuf(void)
444 {
445 	int i;
446 	Fdinfo *f;
447 
448 	if(mux == 0)
449 		return;
450 	_SEGDETACH(mux);
451 	for(i = 0; i < OPEN_MAX; i++){
452 		f = &_fdinfo[i];
453 		if(f->flags&FD_BUFFERED)
454 			f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
455 				/* mark 'poisoned' */
456 	}
457 	mux = 0;
458 	_muxsid = -1;
459 	_mainpid = -1;
460 	timerpid = -1;
461 }
462 
463 static int
464 copynotehandler(void *u, char *msg)
465 {
466 	int i;
467 	void(*f)(int);
468 
469 	if(_finishing)
470 		_finish(0, 0);
471 	_NOTED(1);
472 }
473