1 #define _BSDTIME_EXTENSION 2 #define _LOCK_EXTENSION 3 #include "lib.h" 4 #include <stdlib.h> 5 #include <errno.h> 6 #include <unistd.h> 7 #include <signal.h> 8 #include <string.h> 9 #include <stdio.h> 10 #include <lock.h> 11 #include <sys/time.h> 12 #include <sys/select.h> 13 #include <unistd.h> 14 #include "sys9.h" 15 16 typedef struct Muxseg { 17 Lock lock; /* for mutual exclusion access to buffer variables */ 18 int curfds; /* number of fds currently buffered */ 19 int selwait; /* true if selecting process is waiting */ 20 int waittime; /* time for timer process to wait */ 21 fd_set rwant; /* fd's that select wants to read */ 22 fd_set ewant; /* fd's that select wants to know eof info on */ 23 Muxbuf bufs[INITBUFS]; /* can grow, via segbrk() */ 24 } Muxseg; 25 26 #define MUXADDR ((void*)0x6000000) 27 static Muxseg *mux = 0; /* shared memory segment */ 28 29 /* _muxsid and _killmuxsid are known in libbsd's listen.c */ 30 int _muxsid = -1; /* group id of copy processes */ 31 static int _mainpid = -1; 32 static int timerpid = -1; /* pid of a timer process */ 33 34 void _killmuxsid(void); 35 static void _copyproc(int, Muxbuf*); 36 static void _timerproc(void); 37 static void _resettimer(void); 38 39 static int copynotehandler(void *, char *); 40 41 /* assume FD_SETSIZE is 96 */ 42 #define FD_ANYSET(p) ((p)->fds_bits[0] || (p)->fds_bits[1] || (p)->fds_bits[2]) 43 44 /* 45 * Start making fd read-buffered: make the shared segment, if necessary, 46 * allocate a slot (index into mux->bufs), and fork a child to read the fd 47 * and write into the slot-indexed buffer. 48 * Return -1 if we can't do it. 49 */ 50 int 51 _startbuf(int fd) 52 { 53 long i, slot; 54 int pid; 55 Fdinfo *f; 56 Muxbuf *b; 57 58 if(mux == 0){ 59 _RFORK(RFREND); 60 mux = (Muxseg*)_SEGATTACH(0, "shared", MUXADDR, sizeof(Muxseg)); 61 if((long)mux == -1){ 62 _syserrno(); 63 return -1; 64 } 65 /* segattach has returned zeroed memory */ 66 atexit(_killmuxsid); 67 } 68 69 if(fd == -1) 70 return 0; 71 72 lock(&mux->lock); 73 slot = mux->curfds++; 74 if(mux->curfds > INITBUFS) { 75 if(_SEGBRK(mux, mux->bufs+mux->curfds) < 0){ 76 _syserrno(); 77 unlock(&mux->lock); 78 return -1; 79 } 80 } 81 82 f = &_fdinfo[fd]; 83 b = &mux->bufs[slot]; 84 b->n = 0; 85 b->putnext = b->data; 86 b->getnext = b->data; 87 b->eof = 0; 88 b->fd = fd; 89 if(_mainpid == -1) 90 _mainpid = getpid(); 91 if((pid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){ 92 /* copy process ... */ 93 if(_muxsid == -1) { 94 _RFORK(RFNOTEG); 95 _muxsid = getpgrp(); 96 } else 97 setpgid(getpid(), _muxsid); 98 _NOTIFY(copynotehandler); 99 for(i=0; i<OPEN_MAX; i++) 100 if(i!=fd && (_fdinfo[i].flags&FD_ISOPEN)) 101 _CLOSE(i); 102 _RENDEZVOUS(0, _muxsid); 103 _copyproc(fd, b); 104 } 105 106 /* parent process continues ... */ 107 b->copypid = pid; 108 f->buf = b; 109 f->flags |= FD_BUFFERED; 110 unlock(&mux->lock); 111 _muxsid = _RENDEZVOUS(0, 0); 112 /* leave fd open in parent so system doesn't reuse it */ 113 return 0; 114 } 115 116 /* 117 * The given buffered fd is being closed. 118 * Set the fd field in the shared buffer to -1 to tell copyproc 119 * to exit, and kill the copyproc. 120 */ 121 void 122 _closebuf(int fd) 123 { 124 Muxbuf *b; 125 126 b = _fdinfo[fd].buf; 127 if(!b) 128 return; 129 lock(&mux->lock); 130 b->fd = -1; 131 unlock(&mux->lock); 132 kill(b->copypid, SIGKILL); 133 } 134 135 /* child copy procs execute this until eof */ 136 static void 137 _copyproc(int fd, Muxbuf *b) 138 { 139 unsigned char *e; 140 int n; 141 int nzeros; 142 143 e = &b->data[PERFDMAX]; 144 for(;;) { 145 /* make sure there's room */ 146 lock(&mux->lock); 147 if(e - b->putnext < READMAX) { 148 if(b->getnext == b->putnext) { 149 b->getnext = b->putnext = b->data; 150 unlock(&mux->lock); 151 } else { 152 /* sleep until there's room */ 153 b->roomwait = 1; 154 unlock(&mux->lock); 155 _RENDEZVOUS((unsigned long)&b->roomwait, 0); 156 } 157 } else 158 unlock(&mux->lock); 159 /* 160 * A Zero-length _READ might mean a zero-length write 161 * happened, or it might mean eof; try several times to 162 * disambiguate (posix read() discards 0-length messages) 163 */ 164 nzeros = 0; 165 do { 166 n = _READ(fd, b->putnext, READMAX); 167 if(b->fd == -1) { 168 _exit(0); /* we've been closed */ 169 } 170 } while(n == 0 && ++nzeros < 3); 171 lock(&mux->lock); 172 if(n <= 0) { 173 b->eof = 1; 174 if(mux->selwait && FD_ISSET(fd, &mux->ewant)) { 175 mux->selwait = 0; 176 unlock(&mux->lock); 177 _RENDEZVOUS((unsigned long)&mux->selwait, fd); 178 } else if(b->datawait) { 179 b->datawait = 0; 180 unlock(&mux->lock); 181 _RENDEZVOUS((unsigned long)&b->datawait, 0); 182 } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) { 183 mux->selwait = 0; 184 unlock(&mux->lock); 185 _RENDEZVOUS((unsigned long)&mux->selwait, fd); 186 } else 187 unlock(&mux->lock); 188 _exit(0); 189 } else { 190 b->putnext += n; 191 b->n += n; 192 if(b->n > 0) { 193 /* parent process cannot be both in datawait and selwait */ 194 if(b->datawait) { 195 b->datawait = 0; 196 unlock(&mux->lock); 197 /* wake up _bufreading process */ 198 _RENDEZVOUS((unsigned long)&b->datawait, 0); 199 } else if(mux->selwait && FD_ISSET(fd, &mux->rwant)) { 200 mux->selwait = 0; 201 unlock(&mux->lock); 202 /* wake up selecting process */ 203 _RENDEZVOUS((unsigned long)&mux->selwait, fd); 204 } else 205 unlock(&mux->lock); 206 } else 207 unlock(&mux->lock); 208 } 209 } 210 } 211 212 /* like read(), for a buffered fd; extra arg noblock says don't wait for data if true */ 213 int 214 _readbuf(int fd, void *addr, int nwant, int noblock) 215 { 216 Muxbuf *b; 217 int ngot; 218 219 b = _fdinfo[fd].buf; 220 if(b->eof && b->n == 0) { 221 goteof: 222 return 0; 223 } 224 if(b->n == 0 && noblock) { 225 errno = EAGAIN; 226 return -1; 227 } 228 /* make sure there's data */ 229 lock(&mux->lock); 230 ngot = b->putnext - b->getnext; 231 if(ngot == 0) { 232 /* maybe EOF just happened */ 233 if(b->eof) { 234 unlock(&mux->lock); 235 goto goteof; 236 } 237 /* sleep until there's data */ 238 b->datawait = 1; 239 unlock(&mux->lock); 240 _RENDEZVOUS((unsigned long)&b->datawait, 0); 241 lock(&mux->lock); 242 ngot = b->putnext - b->getnext; 243 } 244 if(ngot == 0) { 245 unlock(&mux->lock); 246 goto goteof; 247 } 248 if(ngot > nwant) 249 ngot = nwant; 250 memcpy(addr, b->getnext, ngot); 251 b->getnext += ngot; 252 b->n -= ngot; 253 if(b->getnext == b->putnext && b->roomwait) { 254 b->getnext = b->putnext = b->data; 255 b->roomwait = 0; 256 unlock(&mux->lock); 257 /* wake up copy process */ 258 _RENDEZVOUS((unsigned long)&b->roomwait, 0); 259 } else 260 unlock(&mux->lock); 261 return ngot; 262 } 263 264 int 265 select(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, struct timeval *timeout) 266 { 267 int n, i, t, slots, fd, err; 268 Fdinfo *f; 269 Muxbuf *b; 270 271 if(timeout) 272 t = timeout->tv_sec*1000 + (timeout->tv_usec+999)/1000; 273 else 274 t = -1; 275 if(!((rfds && FD_ANYSET(rfds)) || (wfds && FD_ANYSET(wfds)) 276 || (efds && FD_ANYSET(efds)))) { 277 /* no requested fds */ 278 if(t > 0) 279 _SLEEP(t); 280 return 0; 281 } 282 283 _startbuf(-1); 284 285 /* make sure all requested rfds and efds are buffered */ 286 if(nfds >= OPEN_MAX) 287 nfds = OPEN_MAX; 288 for(i = 0; i < nfds; i++) 289 if((rfds && FD_ISSET(i, rfds)) || (efds && FD_ISSET(i, efds))){ 290 f = &_fdinfo[i]; 291 if(!(f->flags&FD_BUFFERED)) 292 if(_startbuf(i) != 0) 293 return -1; 294 } 295 296 /* check wfds; for now, we'll say they are all ready */ 297 n = 0; 298 if(wfds && FD_ANYSET(wfds)){ 299 for(i = 0; i<nfds; i++) 300 if(FD_ISSET(i, wfds)) { 301 n++; 302 } 303 } 304 305 lock(&mux->lock); 306 307 slots = mux->curfds; 308 FD_ZERO(&mux->rwant); 309 FD_ZERO(&mux->ewant); 310 311 for(i = 0; i<slots; i++) { 312 b = &mux->bufs[i]; 313 fd = b->fd; 314 if(fd == -1) 315 continue; 316 err = 0; 317 if(efds && FD_ISSET(fd, efds)) { 318 if(b->eof && b->n == 0){ 319 err = 1; 320 n++; 321 }else{ 322 FD_CLR(fd, efds); 323 FD_SET(fd, &mux->ewant); 324 } 325 } 326 if(rfds && FD_ISSET(fd, rfds)) { 327 if(!err && (b->n > 0 || b->eof)) 328 n++; 329 else{ 330 FD_CLR(fd, rfds); 331 FD_SET(fd, &mux->rwant); 332 } 333 } 334 } 335 if(n || !(FD_ANYSET(&mux->rwant) || FD_ANYSET(&mux->ewant)) || t == 0) { 336 FD_ZERO(&mux->rwant); 337 FD_ZERO(&mux->ewant); 338 unlock(&mux->lock); 339 return n; 340 } 341 342 if(timeout) { 343 mux->waittime = t; 344 if(timerpid == -1) 345 _timerproc(); 346 else 347 _resettimer(); 348 } 349 mux->selwait = 1; 350 unlock(&mux->lock); 351 fd = _RENDEZVOUS((unsigned long)&mux->selwait, 0); 352 if(fd >= 0) { 353 b = _fdinfo[fd].buf; 354 if(FD_ISSET(fd, &mux->rwant)) { 355 FD_SET(fd, rfds); 356 n = 1; 357 } else if(FD_ISSET(fd, &mux->ewant) && b->eof && b->n == 0) { 358 FD_SET(fd, efds); 359 n = 1; 360 } 361 } 362 FD_ZERO(&mux->rwant); 363 FD_ZERO(&mux->ewant); 364 return n; 365 } 366 367 static int timerreset; 368 static int timerpid; 369 370 static void 371 alarmed(int) 372 { 373 timerreset = 1; 374 } 375 376 /* a little over an hour */ 377 #define LONGWAIT 4000001 378 379 static void 380 _killtimerproc(void) 381 { 382 if(timerpid > 0) 383 kill(timerpid, SIGKILL); 384 } 385 386 static void 387 _timerproc(void) 388 { 389 int i; 390 391 if((timerpid = _RFORK(RFFDG|RFPROC|RFNOWAIT)) == 0){ 392 /* timer process */ 393 setpgid(getpid(), _muxsid); 394 signal(SIGALRM, alarmed); 395 for(i=0; i<OPEN_MAX; i++) 396 _CLOSE(i); 397 _RENDEZVOUS(1, 0); 398 for(;;) { 399 _SLEEP(mux->waittime); 400 if(timerreset) { 401 timerreset = 0; 402 } else { 403 lock(&mux->lock); 404 if(mux->selwait && mux->waittime != LONGWAIT) { 405 mux->selwait = 0; 406 mux->waittime = LONGWAIT; 407 unlock(&mux->lock); 408 _RENDEZVOUS((unsigned long)&mux->selwait, -2); 409 } else { 410 mux->waittime = LONGWAIT; 411 unlock(&mux->lock); 412 } 413 } 414 } 415 } 416 atexit(_killtimerproc); 417 /* parent process continues */ 418 _RENDEZVOUS(1, 0); 419 } 420 421 static void 422 _resettimer(void) 423 { 424 kill(timerpid, SIGALRM); 425 } 426 427 void 428 _killmuxsid(void) 429 { 430 if(_muxsid != -1 && (_mainpid == getpid() || _mainpid == -1)) 431 kill(-_muxsid,SIGTERM); 432 } 433 434 /* call this on fork(), because reading a BUFFERED fd won't work in child */ 435 void 436 _detachbuf(void) 437 { 438 int i; 439 Fdinfo *f; 440 441 if(mux == 0) 442 return; 443 _SEGDETACH(mux); 444 for(i = 0; i < OPEN_MAX; i++){ 445 f = &_fdinfo[i]; 446 if(f->flags&FD_BUFFERED) 447 f->flags = (f->flags&~FD_BUFFERED) | FD_BUFFEREDX; 448 /* mark 'poisoned' */ 449 } 450 mux = 0; 451 _muxsid = -1; 452 _mainpid = -1; 453 timerpid = -1; 454 } 455 456 static int 457 copynotehandler(void *, char *) 458 { 459 if(_finishing) 460 _finish(0, 0); 461 _NOTED(1); 462 return 0; 463 } 464