1 #define _BSDTIME_EXTENSION
2 #define _LOCK_EXTENSION
3 #include "lib.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <signal.h>
8 #include <string.h>
9 #include <stdio.h>
10 #include <lock.h>
11 #include <sys/time.h>
12 #include <sys/select.h>
13 #include <unistd.h>
14 #include "sys9.h"
15
16 typedef struct Muxseg {
17 Lock lock; /* for mutual exclusion access to buffer variables */
18 int curfds; /* number of fds currently buffered */
19 int selwait; /* true if selecting process is waiting */
20 int waittime; /* time for timer process to wait */
21 fd_set rwant; /* fd's that select wants to read */
22 fd_set ewant; /* fd's that select wants to know eof info on */
23 Muxbuf bufs[INITBUFS]; /* can grow, via segbrk() */
24 } Muxseg;
25
26 #define MUXADDR ((void*)0x6000000)
27 static Muxseg *mux = 0; /* shared memory segment */
28
29 /* _muxsid and _killmuxsid are known in libbsd's listen.c */
30 int _muxsid = -1; /* group id of copy processes */
31 static int _mainpid = -1;
32 static int timerpid = -1; /* pid of a timer process */
33
34 void _killmuxsid(void);
35 static void _copyproc(int, Muxbuf*);
36 static void _timerproc(void);
37 static void _resettimer(void);
38
39 static int copynotehandler(void *, char *);
40
41 /* assume FD_SETSIZE is 96 */
42 #define FD_ANYSET(p) ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2])
43
44 /*
45 * Start making fd read-buffered: make the shared segment, if necessary,
46 * allocate a slot (index into mux->bufs), and fork a child to read the fd
47 * and write into the slot-indexed buffer.
48 * Return -1 if we can't do it.
49 */
50 int
_startbuf(int fd)51 _startbuf(int fd)
52 {
53 long i, slot;
54 int pid;
55 Fdinfo *f;
56 Muxbuf *b;
57
58 if(mux == 0){
59 _RFORK(RFREND);
60 mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg));
61 if((long)mux == -1){
62 _syserrno();
63 return -1;
64 }
65 /* segattach has returned zeroed memory */
66 atexit(_killmuxsid);
67 }
68
69 if(fd == -1)
70 return 0;
71
72 lock(&mux->lock);
73 slot = mux->curfds++;
74 if(mux->curfds > INITBUFS) {
75 if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){
76 _syserrno();
77 unlock(&mux->lock);
78 return -1;
79 }
80 }
81
82 f = &_fdinfo[fd];
83 b = &mux->bufs[slot];
84 b->n = 0;
85 b->putnext = b->data;
86 b->getnext = b->data;
87 b->eof = 0;
88 b->fd = fd;
89 if(_mainpid == -1)
90 _mainpid = getpid();
91 if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
92 /* copy process ... */
93 if(_muxsid == -1) {
94 _RFORK(RFNOTEG);
95 _muxsid = getpgrp();
96 } else
97 setpgid(getpid(), _muxsid);
98 _NOTIFY(copynotehandler);
99 for(i=0; i<OPEN_MAX; i++)
100 if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN))
101 _CLOSE(i);
102 _RENDEZVOUS(0, _muxsid);
103 _copyproc(fd, b);
104 }
105
106 /* parent process continues ... */
107 b->copypid = pid;
108 f->buf = b;
109 f->flags |= FD_BUFFERED;
110 unlock(&mux->lock);
111 _muxsid = _RENDEZVOUS(0, 0);
112 /* leave fd open in parent so system doesn't reuse it */
113 return 0;
114 }
115
116 /*
117 * The given buffered fd is being closed.
118 * Set the fd field in the shared buffer to -1 to tell copyproc
119 * to exit, and kill the copyproc.
120 */
121 void
_closebuf(int fd)122 _closebuf(int fd)
123 {
124 Muxbuf *b;
125
126 b = _fdinfo[fd].buf;
127 if(!b)
128 return;
129 lock(&mux->lock);
130 b->fd = -1;
131 unlock(&mux->lock);
132 kill(b->copypid, SIGKILL);
133 }
134
135 /* child copy procs execute this until eof */
136 static void
_copyproc(int fd,Muxbuf * b)137 _copyproc(int fd, Muxbuf *b)
138 {
139 unsigned char *e;
140 int n;
141 int nzeros;
142
143 e = &b->data[PERFDMAX];
144 for(;;) {
145 /* make sure there's room */
146 lock(&mux->lock);
147 if(e - b->putnext < READMAX) {
148 if(b->getnext == b->putnext) {
149 b->getnext = b->putnext = b->data;
150 unlock(&mux->lock);
151 } else {
152 /* sleep until there's room */
153 b->roomwait = 1;
154 unlock(&mux->lock);
155 _RENDEZVOUS((unsigned long)&b->roomwait, 0);
156 }
157 } else
158 unlock(&mux->lock);
159 /*
160 * A Zero-length _READ might mean a zero-length write
161 * happened, or it might mean eof; try several times to
162 * disambiguate (posix read() discards 0-length messages)
163 */
164 nzeros = 0;
165 do {
166 n = _READ(fd, b->putnext, READMAX);
167 if(b->fd == -1) {
168 _exit(0); /* we've been closed */
169 }
170 } while(n == 0 && ++nzeros < 3);
171 lock(&mux->lock);
172 if(n <= 0) {
173 b->eof = 1;
174 if(mux->selwait && FD_ISSET(fd, &mux->ewant)) {
175 mux->selwait = 0;
176 unlock(&mux->lock);
177 _RENDEZVOUS((unsigned long)&mux->selwait, fd);
178 } else if(b->datawait) {
179 b->datawait = 0;
180 unlock(&mux->lock);
181 _RENDEZVOUS((unsigned long)&b->datawait, 0);
182 } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
183 mux->selwait = 0;
184 unlock(&mux->lock);
185 _RENDEZVOUS((unsigned long)&mux->selwait, fd);
186 } else
187 unlock(&mux->lock);
188 _exit(0);
189 } else {
190 b->putnext += n;
191 b->n += n;
192 if(b->n > 0) {
193 /* parent process cannot be both in datawait and selwait */
194 if(b->datawait) {
195 b->datawait = 0;
196 unlock(&mux->lock);
197 /* wake up _bufreading process */
198 _RENDEZVOUS((unsigned long)&b->datawait, 0);
199 } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) {
200 mux->selwait = 0;
201 unlock(&mux->lock);
202 /* wake up selecting process */
203 _RENDEZVOUS((unsigned long)&mux->selwait, fd);
204 } else
205 unlock(&mux->lock);
206 } else
207 unlock(&mux->lock);
208 }
209 }
210 }
211
212 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */
213 int
_readbuf(int fd,void * addr,int nwant,int noblock)214 _readbuf(int fd, void *addr, int nwant, int noblock)
215 {
216 Muxbuf *b;
217 int ngot;
218
219 b = _fdinfo[fd].buf;
220 if(b->eof && b->n == 0) {
221 goteof:
222 return 0;
223 }
224 if(b->n == 0 && noblock) {
225 errno = EAGAIN;
226 return -1;
227 }
228 /* make sure there's data */
229 lock(&mux->lock);
230 ngot = b->putnext - b->getnext;
231 if(ngot == 0) {
232 /* maybe EOF just happened */
233 if(b->eof) {
234 unlock(&mux->lock);
235 goto goteof;
236 }
237 /* sleep until there's data */
238 b->datawait = 1;
239 unlock(&mux->lock);
240 _RENDEZVOUS((unsigned long)&b->datawait, 0);
241 lock(&mux->lock);
242 ngot = b->putnext - b->getnext;
243 }
244 if(ngot == 0) {
245 unlock(&mux->lock);
246 goto goteof;
247 }
248 if(ngot > nwant)
249 ngot = nwant;
250 memcpy(addr, b->getnext, ngot);
251 b->getnext += ngot;
252 b->n -= ngot;
253 if(b->getnext == b->putnext && b->roomwait) {
254 b->getnext = b->putnext = b->data;
255 b->roomwait = 0;
256 unlock(&mux->lock);
257 /* wake up copy process */
258 _RENDEZVOUS((unsigned long)&b->roomwait, 0);
259 } else
260 unlock(&mux->lock);
261 return ngot;
262 }
263
264 int
select(int nfds,fd_set * rfds,fd_set * wfds,fd_set * efds,struct timeval * timeout)265 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout)
266 {
267 int n, i, t, slots, fd, err;
268 Fdinfo *f;
269 Muxbuf *b;
270
271 if(timeout)
272 t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000;
273 else
274 t = -1;
275 if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds))
276 || (efds && FD_ANYSET(efds)))) {
277 /* no requested fds */
278 if(t > 0)
279 _SLEEP(t);
280 return 0;
281 }
282
283 _startbuf(-1);
284
285 /* make sure all requested rfds and efds are buffered */
286 if(nfds >= OPEN_MAX)
287 nfds = OPEN_MAX;
288 for(i = 0; i < nfds; i++)
289 if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){
290 f = &_fdinfo[i];
291 if(!(f->flags&FD_BUFFERED))
292 if(_startbuf(i) != 0)
293 return -1;
294 }
295
296 /* check wfds; for now, we'll say they are all ready */
297 n = 0;
298 if(wfds && FD_ANYSET(wfds)){
299 for(i = 0; i<nfds; i++)
300 if(FD_ISSET(i, wfds)) {
301 n++;
302 }
303 }
304
305 lock(&mux->lock);
306
307 slots = mux->curfds;
308 FD_ZERO(&mux->rwant);
309 FD_ZERO(&mux->ewant);
310
311 for(i = 0; i<slots; i++) {
312 b = &mux->bufs[i];
313 fd = b->fd;
314 if(fd == -1)
315 continue;
316 err = 0;
317 if(efds && FD_ISSET(fd, efds)) {
318 if(b->eof && b->n == 0){
319 err = 1;
320 n++;
321 }else{
322 FD_CLR(fd, efds);
323 FD_SET(fd, &mux->ewant);
324 }
325 }
326 if(rfds && FD_ISSET(fd, rfds)) {
327 if(!err && (b->n > 0 || b->eof))
328 n++;
329 else{
330 FD_CLR(fd, rfds);
331 FD_SET(fd, &mux->rwant);
332 }
333 }
334 }
335 if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) {
336 FD_ZERO(&mux->rwant);
337 FD_ZERO(&mux->ewant);
338 unlock(&mux->lock);
339 return n;
340 }
341
342 if(timeout) {
343 mux->waittime = t;
344 if(timerpid == -1)
345 _timerproc();
346 else
347 _resettimer();
348 }
349 mux->selwait = 1;
350 unlock(&mux->lock);
351 fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0);
352 if(fd >= 0) {
353 b = _fdinfo[fd].buf;
354 if(FD_ISSET(fd, &mux->rwant)) {
355 FD_SET(fd, rfds);
356 n = 1;
357 } else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) {
358 FD_SET(fd, efds);
359 n = 1;
360 }
361 }
362 FD_ZERO(&mux->rwant);
363 FD_ZERO(&mux->ewant);
364 return n;
365 }
366
367 static int timerreset;
368 static int timerpid;
369
370 static void
alarmed(int)371 alarmed(int)
372 {
373 timerreset = 1;
374 }
375
376 /* a little over an hour */
377 #define LONGWAIT 4000001
378
379 static void
_killtimerproc(void)380 _killtimerproc(void)
381 {
382 if(timerpid > 0)
383 kill(timerpid, SIGKILL);
384 }
385
386 static void
_timerproc(void)387 _timerproc(void)
388 {
389 int i;
390
391 if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){
392 /* timer process */
393 setpgid(getpid(), _muxsid);
394 signal(SIGALRM, alarmed);
395 for(i=0; i<OPEN_MAX; i++)
396 _CLOSE(i);
397 _RENDEZVOUS(1, 0);
398 for(;;) {
399 _SLEEP(mux->waittime);
400 if(timerreset) {
401 timerreset = 0;
402 } else {
403 lock(&mux->lock);
404 if(mux->selwait && mux->waittime != LONGWAIT) {
405 mux->selwait = 0;
406 mux->waittime = LONGWAIT;
407 unlock(&mux->lock);
408 _RENDEZVOUS((unsigned long)&mux->selwait, -2);
409 } else {
410 mux->waittime = LONGWAIT;
411 unlock(&mux->lock);
412 }
413 }
414 }
415 }
416 atexit(_killtimerproc);
417 /* parent process continues */
418 _RENDEZVOUS(1, 0);
419 }
420
421 static void
_resettimer(void)422 _resettimer(void)
423 {
424 kill(timerpid, SIGALRM);
425 }
426
427 void
_killmuxsid(void)428 _killmuxsid(void)
429 {
430 if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1))
431 kill(-_muxsid,SIGTERM);
432 }
433
434 /* call this on fork(), because reading a BUFFERED fd won't work in child */
435 void
_detachbuf(void)436 _detachbuf(void)
437 {
438 int i;
439 Fdinfo *f;
440
441 if(mux == 0)
442 return;
443 _SEGDETACH(mux);
444 for(i = 0; i < OPEN_MAX; i++){
445 f = &_fdinfo[i];
446 if(f->flags&FD_BUFFERED)
447 f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX;
448 /* mark 'poisoned' */
449 }
450 mux = 0;
451 _muxsid = -1;
452 _mainpid = -1;
453 timerpid = -1;
454 }
455
456 static int
copynotehandler(void *,char *)457 copynotehandler(void *, char *)
458 {
459 if(_finishing)
460 _finish(0, 0);
461 _NOTED(1);
462 return 0;
463 }
464