14887Schin /***********************************************************************
24887Schin * *
34887Schin * This software is part of the ast package *
4*12068SRoger.Faulkner@Oracle.COM * Copyright (c) 1985-2010 AT&T Intellectual Property *
54887Schin * and is licensed under the *
64887Schin * Common Public License, Version 1.0 *
78462SApril.Chin@Sun.COM * by AT&T Intellectual Property *
84887Schin * *
94887Schin * A copy of the License is available at *
104887Schin * http://www.opensource.org/licenses/cpl1.0.txt *
114887Schin * (with md5 checksum 059e8cd6165cb4c31e351f2b69388fd9) *
124887Schin * *
134887Schin * Information and Software Systems Research *
144887Schin * AT&T Research *
154887Schin * Florham Park NJ *
164887Schin * *
174887Schin * Glenn Fowler <gsf@research.att.com> *
184887Schin * David Korn <dgk@research.att.com> *
194887Schin * Phong Vo <kpv@research.att.com> *
204887Schin * *
214887Schin ***********************************************************************/
224887Schin #include "sfhdr.h"
234887Schin
244887Schin /* Poll a set of streams to see if any is available for I/O.
254887Schin ** Ready streams are moved to front of array but retain the
264887Schin ** same relative order.
274887Schin **
284887Schin ** Written by Kiem-Phong Vo.
294887Schin */
304887Schin
314887Schin #if __STD_C
sfpoll(Sfio_t ** fa,reg int n,int tm)324887Schin int sfpoll(Sfio_t** fa, reg int n, int tm)
334887Schin #else
344887Schin int sfpoll(fa, n, tm)
354887Schin Sfio_t** fa; /* array of streams to poll */
364887Schin reg int n; /* number of streams in array */
374887Schin int tm; /* time in millisecs for select/poll */
384887Schin #endif
394887Schin {
404887Schin reg int r, c, m, np;
414887Schin reg Sfio_t* f;
424887Schin reg int *status, *check;
434887Schin
444887Schin if(n <= 0 || !fa)
454887Schin return -1;
464887Schin
474887Schin if(!(status = (int*)malloc(2*n*sizeof(int))) )
484887Schin return -1;
494887Schin check = status+n; /* streams that need polling */
504887Schin
514887Schin /* a SF_READ stream is ready if there is buffered read data */
524887Schin #define RDREADY(f) (((f->mode&SF_READ) && f->next < f->endb) || \
534887Schin ((f->mode&SF_WRITE) && f->proc && f->proc->ndata > 0) )
544887Schin
554887Schin /* a SF_WRITE stream is ready if there is no write data */
564887Schin #define WRREADY(f) (!(f->mode&SF_WRITE) || f->next == f->data)
574887Schin
584887Schin #define HASAUXFD(f) (f->proc && f->proc->file >= 0 && f->proc->file != f->file)
594887Schin
604887Schin for(r = c = 0; r < n; ++r) /* compute streams that must be checked */
614887Schin { f = fa[r];
624887Schin status[r] = 0;
634887Schin
644887Schin /* check accessibility */
654887Schin m = f->mode&SF_RDWR;
664887Schin if((int)f->mode != m && _sfmode(f,m,0) < 0)
674887Schin continue;
684887Schin
694887Schin if((f->flags&SF_READ) && RDREADY(f))
704887Schin status[r] |= SF_READ;
714887Schin
724887Schin if((f->flags&SF_WRITE) && WRREADY(f))
734887Schin status[r] |= SF_WRITE;
744887Schin
754887Schin if((f->flags&SF_RDWR) == status[r])
764887Schin continue;
774887Schin
784887Schin /* has discipline, ask its opinion */
794887Schin if(f->disc && f->disc->exceptf)
804887Schin { if((m = (*f->disc->exceptf)(f,SF_DPOLL,&tm,f->disc)) < 0)
814887Schin continue;
824887Schin else if(m > 0)
834887Schin { status[r] = m&SF_RDWR;
844887Schin continue;
854887Schin }
864887Schin }
874887Schin
884887Schin if(f->extent < 0) /* unseekable stream, must poll/select */
894887Schin check[c++] = r;
904887Schin else /* seekable streams are always ready */
914887Schin { if(f->flags&SF_READ)
924887Schin status[r] |= SF_READ;
934887Schin if(f->flags&SF_WRITE)
944887Schin status[r] |= SF_WRITE;
954887Schin }
964887Schin }
974887Schin
984887Schin np = -1;
994887Schin #if _lib_poll
1004887Schin if(c > 0)
1014887Schin { struct pollfd* fds;
1024887Schin
1034887Schin /* construct the poll array */
1044887Schin for(m = 0, r = 0; r < c; ++r, ++m)
1054887Schin { f = fa[check[r]];
1064887Schin if(HASAUXFD(f))
1074887Schin m += 1;
1084887Schin }
1094887Schin if(!(fds = (struct pollfd*)malloc(m*sizeof(struct pollfd))) )
1104887Schin return -1;
1114887Schin
1124887Schin for(m = 0, r = 0; r < c; ++r, ++m)
1134887Schin { f = fa[check[r]];
1144887Schin
1154887Schin fds[m].fd = f->file;
1164887Schin fds[m].events = fds[m].revents = 0;
1174887Schin
1184887Schin if((f->flags&SF_WRITE) && !WRREADY(f) )
1194887Schin fds[m].events |= POLLOUT;
1204887Schin
1214887Schin if((f->flags&SF_READ) && !RDREADY(f) )
1224887Schin { /* a sfpopen situation with two file descriptors */
1234887Schin if((f->mode&SF_WRITE) && HASAUXFD(f))
1244887Schin { m += 1;
1254887Schin fds[m].fd = f->proc->file;
1264887Schin fds[m].revents = 0;
1274887Schin }
1284887Schin
1294887Schin fds[m].events |= POLLIN;
1304887Schin }
1314887Schin }
1324887Schin
1334887Schin while((np = SFPOLL(fds,m,tm)) < 0 )
1344887Schin { if(errno == EINTR || errno == EAGAIN)
1354887Schin errno = 0;
1364887Schin else break;
1374887Schin }
1384887Schin if(np > 0) /* poll succeeded */
1394887Schin np = c;
1404887Schin
1414887Schin for(m = 0, r = 0; r < np; ++r, ++m)
1424887Schin { f = fa[check[r]];
1434887Schin
1444887Schin if((f->flags&SF_WRITE) && !WRREADY(f) )
1454887Schin { if(fds[m].revents&POLLOUT)
1464887Schin status[check[r]] |= SF_WRITE;
1474887Schin }
1484887Schin
1494887Schin if((f->flags&SF_READ) && !RDREADY(f))
1504887Schin { if((f->mode&SF_WRITE) && HASAUXFD(f))
1514887Schin m += 1;
1524887Schin if(fds[m].revents&POLLIN)
1534887Schin status[check[r]] |= SF_READ;
1544887Schin }
1554887Schin }
1564887Schin
1574887Schin free((Void_t*)fds);
1584887Schin }
1594887Schin #endif /*_lib_poll*/
1604887Schin
1614887Schin #if _lib_select
1624887Schin if(np < 0 && c > 0)
1634887Schin { fd_set rd, wr;
1644887Schin struct timeval tmb, *tmp;
1654887Schin
1664887Schin FD_ZERO(&rd);
1674887Schin FD_ZERO(&wr);
1684887Schin m = 0;
1694887Schin for(r = 0; r < c; ++r)
1704887Schin { f = fa[check[r]];
1714887Schin
1724887Schin if(f->file > m)
1734887Schin m = f->file;
1744887Schin
1754887Schin if((f->flags&SF_WRITE) && !WRREADY(f))
1764887Schin FD_SET(f->file,&wr);
1774887Schin
1784887Schin if((f->flags&SF_READ) && !RDREADY(f))
1794887Schin { if((f->mode&SF_WRITE) && HASAUXFD(f))
1804887Schin { if(f->proc->file > m)
1814887Schin m = f->proc->file;
1824887Schin FD_SET(f->proc->file, &rd);
1834887Schin }
1844887Schin else FD_SET(f->file,&rd);
1854887Schin }
1864887Schin }
1874887Schin if(tm < 0)
1884887Schin tmp = NIL(struct timeval*);
1894887Schin else
1904887Schin { tmp = &tmb;
1914887Schin tmb.tv_sec = tm/SECOND;
1924887Schin tmb.tv_usec = (tm%SECOND)*SECOND;
1934887Schin }
1944887Schin
1954887Schin while((np = select(m+1,&rd,&wr,NIL(fd_set*),tmp)) < 0 )
1964887Schin { if(errno == EINTR)
1974887Schin errno = 0;
1984887Schin else break;
1994887Schin }
2004887Schin if(np > 0)
2014887Schin np = c;
2024887Schin
2034887Schin for(r = 0; r < np; ++r)
2044887Schin { f = fa[check[r]];
2054887Schin
2064887Schin if((f->flags&SF_WRITE) && !WRREADY(f) )
2074887Schin { if(FD_ISSET(f->file,&wr) )
2084887Schin status[check[r]] |= SF_WRITE;
2094887Schin }
2104887Schin
2114887Schin if((f->flags&SF_READ) && !RDREADY(f) )
2124887Schin { if((f->mode&SF_WRITE) && HASAUXFD(f) )
2134887Schin { if(FD_ISSET(f->proc->file, &rd) )
2144887Schin status[check[r]] |= SF_READ;
2154887Schin }
2164887Schin else
2174887Schin { if(FD_ISSET(f->file,&rd) )
2184887Schin status[check[r]] |= SF_READ;
2194887Schin }
2204887Schin }
2214887Schin }
2224887Schin }
2234887Schin #endif /*_lib_select*/
2244887Schin
2254887Schin for(r = c = 0; c < n; ++c)
2264887Schin { if(status[c] == 0)
2274887Schin continue;
2284887Schin
2294887Schin f = fa[c];
2304887Schin f->val = (ssize_t)status[c];
2314887Schin
2324887Schin /* announce status */
2334887Schin if(f->disc && f->disc->exceptf)
2344887Schin (*f->disc->exceptf)(f,SF_READY,(Void_t*)(long)status[c],f->disc);
2354887Schin
2364887Schin if(c > r) /* move to front of list */
2374887Schin { fa[c] = fa[r];
2384887Schin fa[r] = f;
2394887Schin }
2404887Schin r += 1;
2414887Schin }
2424887Schin
2434887Schin free((Void_t*)status);
2444887Schin return r;
2454887Schin }
246