xref: /plan9/sys/src/ape/lib/ap/plan9/_buf.c (revision 781103c4074deb8af160e8a0da2742ba6b29dc2b)
1 #define  _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <sys/select.h>
13 #include <unistd.h>
14 #include "sys9.h"
15 
16 typedef struct Muxseg {
17 	Lock	lock;			/* for mutual exclusion access to buffer variables */
18 	int	curfds;			/* number of fds currently buffered */
19 	int	selwait;		/* true if selecting process is waiting */
20 	int	waittime;		/* time for timer process to wait */
21 	fd_set	rwant;			/* fd's that select wants to read */
22 	fd_set	ewant;			/* fd's that select wants to know eof info on */
23 	Muxbuf	bufs[INITBUFS];		/* can grow, via segbrk() */
24 } Muxseg;
25 
26 #define MUXADDR ((void*)0x6000000)
27 static Muxseg *mux = 0;			/* shared memory segment */
28 
29 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
30 int _muxsid = -1;			/* group id of copy processes */
31 static int _mainpid = -1;
32 static int timerpid = -1;		/* pid of a timer process */
33 
34 void _killmuxsid(void);
35 static void _copyproc(int, Muxbuf*);
36 static void _timerproc(void);
37 static void _resettimer(void);
38 
39 static int copynotehandler(void *, char *);
40 
41 /* assume FD_SETSIZE is 96 */
42 #define FD_ANYSET(p)	((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
43 
44 /*
45  * Start making fd read-buffered: make the shared segment, if necessary,
46  * allocate a slot (index into mux->bufs), and fork a child to read the fd
47  * and write into the slot-indexed buffer.
48  * Return -1 if we can't do it.
49  */
50 int
_startbuf(int fd)51 _startbuf(int fd)
52 {
53 	long i, slot;
54 	int pid;
55 	Fdinfo *f;
56 	Muxbuf *b;
57 
58 	if(mux == 0){
59 		_RFORK(RFREND);
60 		mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61 		if((long)mux == -1){
62 			_syserrno();
63 			return -1;
64 		}
65 		/* segattach has returned zeroed memory */
66 		atexit(_killmuxsid);
67 	}
68 
69 	if(fd == -1)
70 		return 0;
71 
72 	lock(&mux->lock);
73 	slot = mux->curfds++;
74 	if(mux->curfds > INITBUFS) {
75 		if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
76 			_syserrno();
77 			unlock(&mux->lock);
78 			return -1;
79 		}
80 	}
81 
82 	f = &_fdinfo[fd];
83 	b = &mux->bufs[slot];
84 	b->n = 0;
85 	b->putnext = b->data;
86 	b->getnext = b->data;
87 	b->eof = 0;
88 	b->fd = fd;
89 	if(_mainpid == -1)
90 		_mainpid = getpid();
91 	if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
92 		/* copy process ... */
93 		if(_muxsid == -1) {
94 			_RFORK(RFNOTEG);
95 			_muxsid = getpgrp();
96 		} else
97 			setpgid(getpid(), _muxsid);
98 		_NOTIFY(copynotehandler);
99 		for(i=0; i<OPEN_MAX; i++)
100 			if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
101 				_CLOSE(i);
102 		_RENDEZVOUS(0, _muxsid);
103 		_copyproc(fd, b);
104 	}
105 
106 	/* parent process continues ... */
107 	b->copypid = pid;
108 	f->buf = b;
109 	f->flags |= FD_BUFFERED;
110 	unlock(&mux->lock);
111 	_muxsid = _RENDEZVOUS(0, 0);
112 	/* leave fd open in parent so system doesn't reuse it */
113 	return 0;
114 }
115 
116 /*
117  * The given buffered fd is being closed.
118  * Set the fd field in the shared buffer to -1 to tell copyproc
119  * to exit, and kill the copyproc.
120  */
121 void
_closebuf(int fd)122 _closebuf(int fd)
123 {
124 	Muxbuf *b;
125 
126 	b = _fdinfo[fd].buf;
127 	if(!b)
128 		return;
129 	lock(&mux->lock);
130 	b->fd = -1;
131 	unlock(&mux->lock);
132 	kill(b->copypid, SIGKILL);
133 }
134 
135 /* child copy procs execute this until eof */
136 static void
_copyproc(int fd,Muxbuf * b)137 _copyproc(int fd, Muxbuf *b)
138 {
139 	unsigned char *e;
140 	int n;
141 	int nzeros;
142 
143 	e = &b->data[PERFDMAX];
144 	for(;;) {
145 		/* make sure there's room */
146 		lock(&mux->lock);
147 		if(e - b->putnext < READMAX) {
148 			if(b->getnext == b->putnext) {
149 				b->getnext = b->putnext = b->data;
150 				unlock(&mux->lock);
151 			} else {
152 				/* sleep until there's room */
153 				b->roomwait = 1;
154 				unlock(&mux->lock);
155 				_RENDEZVOUS((unsigned long)&b->roomwait, 0);
156 			}
157 		} else
158 			unlock(&mux->lock);
159 		/*
160 		 * A Zero-length _READ might mean a zero-length write
161 		 * happened, or it might mean eof; try several times to
162 		 * disambiguate (posix read() discards 0-length messages)
163 		 */
164 		nzeros = 0;
165 		do {
166 			n = _READ(fd, b->putnext, READMAX);
167 			if(b->fd == -1) {
168 				_exit(0);		/* we've been closed */
169 			}
170 		} while(n == 0 && ++nzeros < 3);
171 		lock(&mux->lock);
172 		if(n <= 0) {
173 			b->eof = 1;
174 			if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
175 				mux->selwait = 0;
176 				unlock(&mux->lock);
177 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
178 			} else if(b->datawait) {
179 				b->datawait = 0;
180 				unlock(&mux->lock);
181 				_RENDEZVOUS((unsigned long)&b->datawait, 0);
182 			} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
183 				mux->selwait = 0;
184 				unlock(&mux->lock);
185 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
186 			} else
187 				unlock(&mux->lock);
188 			_exit(0);
189 		} else {
190 			b->putnext += n;
191 			b->n += n;
192 			if(b->n > 0) {
193 				/* parent process cannot be both in datawait and selwait */
194 				if(b->datawait) {
195 					b->datawait = 0;
196 					unlock(&mux->lock);
197 					/* wake up _bufreading process */
198 					_RENDEZVOUS((unsigned long)&b->datawait, 0);
199 				} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
200 					mux->selwait = 0;
201 					unlock(&mux->lock);
202 					/* wake up selecting process */
203 					_RENDEZVOUS((unsigned long)&mux->selwait, fd);
204 				} else
205 					unlock(&mux->lock);
206 			} else
207 				unlock(&mux->lock);
208 		}
209 	}
210 }
211 
212 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
213 int
_readbuf(int fd,void * addr,int nwant,int noblock)214 _readbuf(int fd, void *addr, int nwant, int noblock)
215 {
216 	Muxbuf *b;
217 	int ngot;
218 
219 	b = _fdinfo[fd].buf;
220 	if(b->eof && b->n == 0) {
221 goteof:
222 		return 0;
223 	}
224 	if(b->n == 0 && noblock) {
225 		errno = EAGAIN;
226 		return -1;
227 	}
228 	/* make sure there's data */
229 	lock(&mux->lock);
230 	ngot = b->putnext - b->getnext;
231 	if(ngot == 0) {
232 		/* maybe EOF just happened */
233 		if(b->eof) {
234 			unlock(&mux->lock);
235 			goto goteof;
236 		}
237 		/* sleep until there's data */
238 		b->datawait = 1;
239 		unlock(&mux->lock);
240 		_RENDEZVOUS((unsigned long)&b->datawait, 0);
241 		lock(&mux->lock);
242 		ngot = b->putnext - b->getnext;
243 	}
244 	if(ngot == 0) {
245 		unlock(&mux->lock);
246 		goto goteof;
247 	}
248 	if(ngot > nwant)
249 		ngot = nwant;
250 	memcpy(addr, b->getnext, ngot);
251 	b->getnext += ngot;
252 	b->n -= ngot;
253 	if(b->getnext == b->putnext && b->roomwait) {
254 		b->getnext = b->putnext = b->data;
255 		b->roomwait = 0;
256 		unlock(&mux->lock);
257 		/* wake up copy process */
258 		_RENDEZVOUS((unsigned long)&b->roomwait, 0);
259 	} else
260 		unlock(&mux->lock);
261 	return ngot;
262 }
263 
264 int
select(int nfds,fd_set * rfds,fd_set * wfds,fd_set * efds,struct timeval * timeout)265 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
266 {
267 	int n, i, t, slots, fd, err;
268 	Fdinfo *f;
269 	Muxbuf *b;
270 
271 	if(timeout)
272 		t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
273 	else
274 		t = -1;
275 	if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
276 			|| (efds && FD_ANYSET(efds)))) {
277 		/* no requested fds */
278 		if(t > 0)
279 			_SLEEP(t);
280 		return 0;
281 	}
282 
283 	_startbuf(-1);
284 
285 	/* make sure all requested rfds and efds are buffered */
286 	if(nfds >= OPEN_MAX)
287 		nfds = OPEN_MAX;
288 	for(i = 0; i < nfds; i++)
289 		if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
290 			f = &_fdinfo[i];
291 			if(!(f->flags&FD_BUFFERED))
292 				if(_startbuf(i) != 0)
293 					return -1;
294 		}
295 
296 	/* check wfds;  for now, we'll say they are all ready */
297 	n = 0;
298 	if(wfds && FD_ANYSET(wfds)){
299 		for(i = 0; i<nfds; i++)
300 			if(FD_ISSET(i, wfds)) {
301 				n++;
302 			}
303 	}
304 
305 	lock(&mux->lock);
306 
307 	slots = mux->curfds;
308 	FD_ZERO(&mux->rwant);
309 	FD_ZERO(&mux->ewant);
310 
311 	for(i = 0; i<slots; i++) {
312 		b = &mux->bufs[i];
313 		fd = b->fd;
314 		if(fd == -1)
315 			continue;
316 		err = 0;
317 		if(efds && FD_ISSET(fd, efds)) {
318 			if(b->eof && b->n == 0){
319 				err = 1;
320 				n++;
321 			}else{
322 				FD_CLR(fd, efds);
323 				FD_SET(fd, &mux->ewant);
324 			}
325 		}
326 		if(rfds && FD_ISSET(fd, rfds)) {
327 			if(!err && (b->n > 0 || b->eof))
328 				n++;
329 			else{
330 				FD_CLR(fd, rfds);
331 				FD_SET(fd, &mux->rwant);
332 			}
333 		}
334 	}
335 	if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
336 		FD_ZERO(&mux->rwant);
337 		FD_ZERO(&mux->ewant);
338 		unlock(&mux->lock);
339 		return n;
340 	}
341 
342 	if(timeout) {
343 		mux->waittime = t;
344 		if(timerpid == -1)
345 			_timerproc();
346 		else
347 			_resettimer();
348 	}
349 	mux->selwait = 1;
350 	unlock(&mux->lock);
351 	fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
352 	if(fd >= 0) {
353 		b = _fdinfo[fd].buf;
354 		if(FD_ISSET(fd, &mux->rwant)) {
355 			FD_SET(fd, rfds);
356 			n = 1;
357 		} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
358 			FD_SET(fd, efds);
359 			n = 1;
360 		}
361 	}
362 	FD_ZERO(&mux->rwant);
363 	FD_ZERO(&mux->ewant);
364 	return n;
365 }
366 
367 static int timerreset;
368 static int timerpid;
369 
370 static void
alarmed(int)371 alarmed(int)
372 {
373 	timerreset = 1;
374 }
375 
376 /* a little over an hour */
377 #define LONGWAIT 4000001
378 
379 static void
_killtimerproc(void)380 _killtimerproc(void)
381 {
382 	if(timerpid > 0)
383 		kill(timerpid, SIGKILL);
384 }
385 
386 static void
_timerproc(void)387 _timerproc(void)
388 {
389 	int i;
390 
391 	if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
392 		/* timer process */
393 		setpgid(getpid(), _muxsid);
394 		signal(SIGALRM, alarmed);
395 		for(i=0; i<OPEN_MAX; i++)
396 				_CLOSE(i);
397 		_RENDEZVOUS(1, 0);
398 		for(;;) {
399 			_SLEEP(mux->waittime);
400 			if(timerreset) {
401 				timerreset = 0;
402 			} else {
403 				lock(&mux->lock);
404 				if(mux->selwait && mux->waittime != LONGWAIT) {
405 					mux->selwait = 0;
406 					mux->waittime = LONGWAIT;
407 					unlock(&mux->lock);
408 					_RENDEZVOUS((unsigned long)&mux->selwait, -2);
409 				} else {
410 					mux->waittime = LONGWAIT;
411 					unlock(&mux->lock);
412 				}
413 			}
414 		}
415 	}
416 	atexit(_killtimerproc);
417 	/* parent process continues */
418 	_RENDEZVOUS(1, 0);
419 }
420 
421 static void
_resettimer(void)422 _resettimer(void)
423 {
424 	kill(timerpid, SIGALRM);
425 }
426 
427 void
_killmuxsid(void)428 _killmuxsid(void)
429 {
430 	if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
431 		kill(-_muxsid,SIGTERM);
432 }
433 
434 /* call this on fork(), because reading a BUFFERED fd won't work in child */
435 void
_detachbuf(void)436 _detachbuf(void)
437 {
438 	int i;
439 	Fdinfo *f;
440 
441 	if(mux == 0)
442 		return;
443 	_SEGDETACH(mux);
444 	for(i = 0; i < OPEN_MAX; i++){
445 		f = &_fdinfo[i];
446 		if(f->flags&FD_BUFFERED)
447 			f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
448 				/* mark 'poisoned' */
449 	}
450 	mux = 0;
451 	_muxsid = -1;
452 	_mainpid = -1;
453 	timerpid = -1;
454 }
455 
456 static int
copynotehandler(void *,char *)457 copynotehandler(void *, char *)
458 {
459 	if(_finishing)
460 		_finish(0, 0);
461 	_NOTED(1);
462 	return 0;
463 }
464