xref: /plan9-contrib/sys/src/ape/lib/ap/plan9/_buf.c (revision 7dd7cddf99dd7472612f1413b4da293630e6b1bc)
1 #define  _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <select.h>
13 #include "sys9.h"
14 
15 typedef struct Muxseg {
16 	Lock		lock;				/* for mutual exclusion access to buffer variables */
17 	int		curfds;			/* number of fds currently buffered */
18 	int		selwait;			/* true if selecting process is waiting */
19 	int		waittime;			/* time for timer process to wait */
20 	fd_set	rwant;			/* fd's that select wants to read */
21 	fd_set	ewant;			/* fd's that select wants to know eof info on */
22 	Muxbuf	bufs[INITBUFS];	/* can grow, via segbrk() */
23 } Muxseg;
24 
25 #define MUXADDR ((void*)0x6000000)
26 static Muxseg *mux = 0;			/* shared memory segment */
27 
28 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
29 int _muxsid = -1;			/* group id of copy processes */
30 static int _mainpid = -1;
31 static int timerpid = -1;			/* pid of a timer process */
32 
33 void _killmuxsid(void);
34 static void _copyproc(int, Muxbuf*);
35 static void _timerproc(void);
36 static void _resettimer(void);
37 
38 static int copynotehandler(void *, char *);
39 
40 /* assume FD_SETSIZE is 96 */
41 #define FD_ANYSET(p)	((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
42 
43 
44 /*
45  * Start making fd read-buffered: make the shared segment, if necessary,
46  * allocate a slot (index into mux->bufs), and fork a child to read the fd
47  * and write into the slot-indexed buffer.
48  * Return -1 if we can't do it.
49  */
50 int
51 _startbuf(int fd)
52 {
53 	long i, n, slot;
54 	int pid, sid;
55 	Fdinfo *f;
56 	Muxbuf *b;
57 
58 	if(mux == 0){
59 		_RFORK(RFREND);
60 		mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61 		if((long)mux == -1){
62 			_syserrno();
63 			return -1;
64 		}
65 		/* segattach has returned zeroed memory */
66 		atexit(_killmuxsid);
67 	}
68 
69 	slot = mux->curfds++;
70 	if(mux->curfds > INITBUFS) {
71 		if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
72 			_syserrno();
73 			return -1;
74 		}
75 	}
76 
77 	f = &_fdinfo[fd];
78 	b = &mux->bufs[slot];
79 	b->n = 0;
80 	b->putnext = b->data;
81 	b->getnext = b->data;
82 	b->eof = 0;
83 	b->fd = fd;
84 	if(_mainpid == -1)
85 		_mainpid = getpid();
86 	if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
87 		/* copy process ... */
88 		if(_muxsid == -1) {
89 			_RFORK(RFNOTEG);
90 			_muxsid = getpgrp();
91 		} else
92 			setpgid(getpid(), _muxsid);
93 		_NOTIFY(copynotehandler);
94 		for(i=0; i<OPEN_MAX; i++)
95 			if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
96 				_CLOSE(i);
97 		_RENDEZVOUS(0, _muxsid);
98 		_copyproc(fd, b);
99 	}
100 
101 	/* parent process continues ... */
102 	b->copypid = pid;
103 	f->buf = b;
104 	f->flags |= FD_BUFFERED;
105 	_muxsid = _RENDEZVOUS(0, 0);
106 	/* leave fd open in parent so system doesn't reuse it */
107 	return 0;
108 }
109 
110 /*
111  * The given buffered fd is being closed.
112  * Set the fd field in the shared buffer to -1 to tell copyproc
113  * to exit, and kill the copyproc.
114  */
115 void
116 _closebuf(int fd)
117 {
118 	Muxbuf *b;
119 
120 	b = _fdinfo[fd].buf;
121 	if(!b)
122 		return;
123 	lock(&mux->lock);
124 	b->fd = -1;
125 	unlock(&mux->lock);
126 	kill(b->copypid, SIGKILL);
127 }
128 
129 /* child copy procs execute this until eof */
130 static void
131 _copyproc(int fd, Muxbuf *b)
132 {
133 	unsigned char *e;
134 	int n;
135 	int nzeros;
136 
137 	e = &b->data[PERFDMAX];
138 	for(;;) {
139 		/* make sure there's room */
140 		lock(&mux->lock);
141 		if(e - b->putnext < READMAX) {
142 			if(b->getnext == b->putnext) {
143 				b->getnext = b->putnext = b->data;
144 				unlock(&mux->lock);
145 			} else {
146 				/* sleep until there's room */
147 				b->roomwait = 1;
148 				unlock(&mux->lock);
149 				_RENDEZVOUS((unsigned long)&b->roomwait, 0);
150 			}
151 		} else
152 			unlock(&mux->lock);
153 		/*
154 		 * A Zero-length _READ might mean a zero-length write
155 		 * happened, or it might mean eof; try several times to
156 		 * disambiguate (posix read() discards 0-length messages)
157 		 */
158 		nzeros = 0;
159 		do {
160 			n = _READ(fd, b->putnext, READMAX);
161 			if(b->fd == -1) {
162 				_exit(0);		/* we've been closed */
163 			}
164 		} while(n == 0 && ++nzeros < 3);
165 		lock(&mux->lock);
166 		if(n <= 0) {
167 			b->eof = 1;
168 			if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
169 				mux->selwait = 0;
170 				unlock(&mux->lock);
171 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
172 			} else if(b->datawait) {
173 				b->datawait = 0;
174 				unlock(&mux->lock);
175 				_RENDEZVOUS((unsigned long)&b->datawait, 0);
176 			} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
177 				mux->selwait = 0;
178 				unlock(&mux->lock);
179 				_RENDEZVOUS((unsigned long)&mux->selwait, fd);
180 			} else
181 				unlock(&mux->lock);
182 			_exit(0);
183 		} else {
184 			b->putnext += n;
185 			b->n += n;
186 			if(b->n > 0) {
187 				/* parent process cannot be both in datawait and selwait */
188 				if(b->datawait) {
189 					b->datawait = 0;
190 					unlock(&mux->lock);
191 					/* wake up _bufreading process */
192 					_RENDEZVOUS((unsigned long)&b->datawait, 0);
193 				} else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
194 					mux->selwait = 0;
195 					unlock(&mux->lock);
196 					/* wake up selecting process */
197 					_RENDEZVOUS((unsigned long)&mux->selwait, fd);
198 				} else
199 					unlock(&mux->lock);
200 			} else
201 				unlock(&mux->lock);
202 		}
203 	}
204 }
205 
206 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
207 int
208 _readbuf(int fd, void *addr, int nwant, int noblock)
209 {
210 	Muxbuf *b;
211 	int ngot;
212 
213 	b = _fdinfo[fd].buf;
214 	if(b->eof && b->n == 0) {
215 goteof:
216 		return 0;
217 	}
218 	if(b->n == 0 && noblock) {
219 		errno = EAGAIN;
220 		return -1;
221 	}
222 	/* make sure there's data */
223 	lock(&mux->lock);
224 	ngot = b->putnext - b->getnext;
225 	if(ngot == 0) {
226 		/* maybe EOF just happened */
227 		if(b->eof) {
228 			unlock(&mux->lock);
229 			goto goteof;
230 		}
231 		/* sleep until there's data */
232 		b->datawait = 1;
233 		unlock(&mux->lock);
234 		_RENDEZVOUS((unsigned long)&b->datawait, 0);
235 		lock(&mux->lock);
236 		ngot = b->putnext - b->getnext;
237 	}
238 	if(ngot == 0) {
239 		unlock(&mux->lock);
240 		goto goteof;
241 	}
242 	if(ngot > nwant)
243 		ngot = nwant;
244 	memcpy(addr, b->getnext, ngot);
245 	b->getnext += ngot;
246 	b->n -= ngot;
247 	if(b->getnext == b->putnext && b->roomwait) {
248 		b->getnext = b->putnext = b->data;
249 		b->roomwait = 0;
250 		unlock(&mux->lock);
251 		/* wake up copy process */
252 		_RENDEZVOUS((unsigned long)&b->roomwait, 0);
253 	} else
254 		unlock(&mux->lock);
255 	return ngot;
256 }
257 
258 int
259 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
260 {
261 	int n, i, tmp, t, slots, fd;
262 	Fdinfo *f;
263 	Muxbuf *b;
264 
265 	if(timeout)
266 		t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
267 	else
268 		t = -1;
269 	if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
270 			|| (efds && FD_ANYSET(efds)))) {
271 		/* no requested fds */
272 		if(t > 0)
273 			_SLEEP(t);
274 		return 0;
275 	}
276 
277 	/* make sure all requested rfds and efds are buffered */
278 	if(nfds >= OPEN_MAX)
279 		nfds = OPEN_MAX-1;
280 	for(i = 0; i<= nfds; i++)
281 		if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
282 			f = &_fdinfo[i];
283 			if(!(f->flags&FD_BUFFERED))
284 				if(_startbuf(i) != 0) {
285 					return -1;
286 				}
287 			b = f->buf;
288 			if(rfds && FD_ISSET(i,rfds) && b->eof && b->n == 0) {
289 				errno = EBADF;		/* how X tells a client is gone */
290 				return -1;
291 			}
292 		}
293 
294 	/* check wfds;  for now, we'll say they are all ready */
295 	n = 0;
296 	if(wfds && FD_ANYSET(wfds)){
297 		for(i = 0; i<nfds; i++)
298 			if(FD_ISSET(i, wfds)) {
299 				n++;
300 			}
301 	}
302 
303 	lock(&mux->lock);
304 
305 	slots = mux->curfds;
306 	FD_ZERO(&mux->rwant);
307 	FD_ZERO(&mux->ewant);
308 
309 	for(i = 0; i<slots; i++) {
310 		b = &mux->bufs[i];
311 		fd = b->fd;
312 		if(fd == -1)
313 			continue;
314 		if(rfds && FD_ISSET(fd, rfds)) {
315 			if(b->n > 0 || b->eof)
316 				n++;
317 			else{
318 				FD_CLR(fd, rfds);
319 				FD_SET(fd, &mux->rwant);
320 			}
321 		}
322 		if(efds && FD_ISSET(fd, efds)) {
323 			if(b->eof && b->n == 0)
324 				n++;
325 			else{
326 				FD_CLR(fd, efds);
327 				FD_SET(fd, &mux->ewant);
328 			}
329 		}
330 	}
331 	if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
332 		FD_ZERO(&mux->rwant);
333 		FD_ZERO(&mux->ewant);
334 		unlock(&mux->lock);
335 		return n;
336 	}
337 
338 	if(timeout) {
339 		mux->waittime = t;
340 		if(timerpid == -1)
341 			_timerproc();
342 		else
343 			_resettimer();
344 	}
345 	mux->selwait = 1;
346 	unlock(&mux->lock);
347 	fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
348 	if(fd >= 0) {
349 		b = _fdinfo[fd].buf;
350 		if(FD_ISSET(fd, &mux->rwant)) {
351 			FD_SET(fd, rfds);
352 			n = 1;
353 		} else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
354 			FD_SET(fd, efds);
355 			n = 1;
356 		}
357 	}
358 	FD_ZERO(&mux->rwant);
359 	FD_ZERO(&mux->ewant);
360 	return n;
361 }
362 
363 static int timerreset;
364 
365 static void
366 alarmed(int v)
367 {
368 	timerreset = 1;
369 }
370 
371 /* a little over an hour */
372 #define LONGWAIT 4000001
373 
374 static void
375 _timerproc(void)
376 {
377 	int i;
378 
379 	if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
380 		/* timer process */
381 		setpgid(getpid(), _muxsid);
382 		signal(SIGALRM, alarmed);
383 		for(i=0; i<OPEN_MAX; i++)
384 				_CLOSE(i);
385 		_RENDEZVOUS(1, 0);
386 		for(;;) {
387 			_SLEEP(mux->waittime);
388 			if(timerreset) {
389 				timerreset = 0;
390 			} else {
391 				lock(&mux->lock);
392 				if(mux->selwait && mux->waittime != LONGWAIT) {
393 					mux->selwait = 0;
394 					mux->waittime = LONGWAIT;
395 					unlock(&mux->lock);
396 					_RENDEZVOUS((unsigned long)&mux->selwait, -2);
397 				} else {
398 					mux->waittime = LONGWAIT;
399 					unlock(&mux->lock);
400 				}
401 			}
402 		}
403 	}
404 	/* parent process continues */
405 	_RENDEZVOUS(1, 0);
406 }
407 
408 static void
409 _resettimer(void)
410 {
411 	kill(timerpid, SIGALRM);
412 }
413 
414 void
415 _killmuxsid(void)
416 {
417 	if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
418 		kill(-_muxsid,SIGTERM);
419 }
420 
421 /* call this on fork(), because reading a BUFFERED fd won't work in child */
422 void
423 _detachbuf(void)
424 {
425 	int i;
426 	Fdinfo *f;
427 
428 	if(mux == 0)
429 		return;
430 	_SEGDETACH(mux);
431 	for(i = 0; i < OPEN_MAX; i++){
432 		f = &_fdinfo[i];
433 		if(f->flags&FD_BUFFERED)
434 			f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
435 				/* mark 'poisoned' */
436 	}
437 	mux = 0;
438 	_muxsid = -1;
439 	_mainpid = -1;
440 	timerpid = -1;
441 }
442 
443 static int
444 copynotehandler(void *u, char *msg)
445 {
446 	int i;
447 	void(*f)(int);
448 
449 	if(_finishing)
450 		_finish(0, 0);
451 	_NOTED(1);
452 }
453