12248Sraf /*
22248Sraf * CDDL HEADER START
32248Sraf *
42248Sraf * The contents of this file are subject to the terms of the
52248Sraf * Common Development and Distribution License (the "License").
62248Sraf * You may not use this file except in compliance with the License.
72248Sraf *
82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf * or http://www.opensolaris.org/os/licensing.
102248Sraf * See the License for the specific language governing permissions
112248Sraf * and limitations under the License.
122248Sraf *
132248Sraf * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf * If applicable, add the following below this CDDL HEADER, with the
162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf *
192248Sraf * CDDL HEADER END
202248Sraf */
212248Sraf
222248Sraf /*
235891Sraf * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
242248Sraf * Use is subject to license terms.
252248Sraf */
262248Sraf
272248Sraf #pragma ident "%Z%%M% %I% %E% SMI"
282248Sraf
296812Sraf #include "lint.h"
302248Sraf #include "thr_uberdata.h"
312248Sraf #include "asyncio.h"
322248Sraf #include <atomic.h>
332248Sraf #include <sys/param.h>
342248Sraf #include <sys/file.h>
352248Sraf #include <sys/port.h>
362248Sraf
372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *);
382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *);
392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
412248Sraf static void _aio_work_done(aio_worker_t *);
422248Sraf static void _aio_enq_doneq(aio_req_t *);
432248Sraf
442248Sraf extern void _aio_lio_free(aio_lio_t *);
452248Sraf
462248Sraf extern int __fdsync(int, int);
475937Sraf extern int __fcntl(int, int, ...);
482248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
492248Sraf
502248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
512248Sraf static void _aiodone(aio_req_t *, ssize_t, int);
522248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
532248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int);
542248Sraf
552248Sraf /*
562248Sraf * switch for kernel async I/O
572248Sraf */
582248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */
592248Sraf
602248Sraf /*
612248Sraf * Key for thread-specific data
622248Sraf */
632248Sraf pthread_key_t _aio_key;
642248Sraf
652248Sraf /*
662248Sraf * Array for determining whether or not a file supports kaio.
672248Sraf * Initialized in _kaio_init().
682248Sraf */
692248Sraf uint32_t *_kaio_supported = NULL;
702248Sraf
712248Sraf /*
722248Sraf * workers for read/write requests
732248Sraf * (__aio_mutex lock protects circular linked list of workers)
742248Sraf */
752248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */
762248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */
772248Sraf int __rw_workerscnt; /* number of read/write workers */
782248Sraf
792248Sraf /*
802248Sraf * worker for notification requests.
812248Sraf */
822248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */
832248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */
842248Sraf int __no_workerscnt; /* number of write workers */
852248Sraf
862248Sraf aio_req_t *_aio_done_tail; /* list of done requests */
872248Sraf aio_req_t *_aio_done_head;
882248Sraf
892248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */
902248Sraf cond_t __aio_initcv = DEFAULTCV;
912248Sraf int __aio_initbusy = 0;
922248Sraf
932248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */
942248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */
952248Sraf
962248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */
972248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */
982248Sraf
992248Sraf aio_hash_t *_aio_hash;
1002248Sraf
1012248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */
1022248Sraf
1032248Sraf int _aio_donecnt = 0;
1042248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */
1052248Sraf int _aio_doneq_cnt = 0;
1062248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */
1072248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */
1082248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */
1092248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */
1102248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */
1112248Sraf
1122248Sraf int _max_workers = 256; /* max number of workers permitted */
1132248Sraf int _min_workers = 4; /* min number of workers */
1142248Sraf int _minworkload = 2; /* min number of request in q */
1152248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */
1162248Sraf int __uaio_ok = 0; /* AIO has been enabled */
1172248Sraf sigset_t _worker_set; /* worker's signal mask */
1182248Sraf
1192248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */
1202248Sraf int _aio_flags = 0; /* see asyncio.h defines for */
1212248Sraf
1222248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */
1232248Sraf
1242248Sraf int hz; /* clock ticks per second */
1252248Sraf
1262248Sraf static int
_kaio_supported_init(void)1272248Sraf _kaio_supported_init(void)
1282248Sraf {
1292248Sraf void *ptr;
1302248Sraf size_t size;
1312248Sraf
1322248Sraf if (_kaio_supported != NULL) /* already initialized */
1332248Sraf return (0);
1342248Sraf
1352248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
1362248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
1372248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
1382248Sraf if (ptr == MAP_FAILED)
1392248Sraf return (-1);
1402248Sraf _kaio_supported = ptr;
1412248Sraf return (0);
1422248Sraf }
1432248Sraf
1442248Sraf /*
1452248Sraf * The aio subsystem is initialized when an AIO request is made.
1462248Sraf * Constants are initialized like the max number of workers that
1472248Sraf * the subsystem can create, and the minimum number of workers
1482248Sraf * permitted before imposing some restrictions. Also, some
1492248Sraf * workers are created.
1502248Sraf */
1512248Sraf int
__uaio_init(void)1522248Sraf __uaio_init(void)
1532248Sraf {
1542248Sraf int ret = -1;
1552248Sraf int i;
1565891Sraf int cancel_state;
1572248Sraf
1582248Sraf lmutex_lock(&__aio_initlock);
1595891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
1602248Sraf while (__aio_initbusy)
1615891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock);
1625891Sraf (void) pthread_setcancelstate(cancel_state, NULL);
1632248Sraf if (__uaio_ok) { /* already initialized */
1642248Sraf lmutex_unlock(&__aio_initlock);
1652248Sraf return (0);
1662248Sraf }
1672248Sraf __aio_initbusy = 1;
1682248Sraf lmutex_unlock(&__aio_initlock);
1692248Sraf
1702248Sraf hz = (int)sysconf(_SC_CLK_TCK);
1712248Sraf __pid = getpid();
1722248Sraf
1732248Sraf setup_cancelsig(SIGAIOCANCEL);
1742248Sraf
1752248Sraf if (_kaio_supported_init() != 0)
1762248Sraf goto out;
1772248Sraf
1782248Sraf /*
1792248Sraf * Allocate and initialize the hash table.
1803344Ssp92102 * Do this only once, even if __uaio_init() is called twice.
1812248Sraf */
1823344Ssp92102 if (_aio_hash == NULL) {
1833344Ssp92102 /* LINTED pointer cast */
1843344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL,
1853344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
1863344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
1873344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) {
1883344Ssp92102 _aio_hash = NULL;
1893344Ssp92102 goto out;
1903344Ssp92102 }
1913344Ssp92102 for (i = 0; i < HASHSZ; i++)
1923344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock,
1933344Ssp92102 USYNC_THREAD, NULL);
1942248Sraf }
1952248Sraf
1962248Sraf /*
1972248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL.
1982248Sraf */
1992248Sraf (void) sigfillset(&_worker_set);
2002248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL);
2012248Sraf
2022248Sraf /*
2033344Ssp92102 * Create one worker to send asynchronous notifications.
2043344Ssp92102 * Do this only once, even if __uaio_init() is called twice.
2053344Ssp92102 */
2063344Ssp92102 if (__no_workerscnt == 0 &&
2073344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
2083344Ssp92102 errno = EAGAIN;
2093344Ssp92102 goto out;
2103344Ssp92102 }
2113344Ssp92102
2123344Ssp92102 /*
2132248Sraf * Create the minimum number of read/write workers.
2143344Ssp92102 * And later check whether atleast one worker is created;
2153344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion.
2162248Sraf */
2172248Sraf for (i = 0; i < _min_workers; i++)
2182248Sraf (void) _aio_create_worker(NULL, AIOREAD);
2193344Ssp92102 if (__rw_workerscnt == 0) {
2203344Ssp92102 errno = EAGAIN;
2213344Ssp92102 goto out;
2223344Ssp92102 }
2232248Sraf
2242248Sraf ret = 0;
2252248Sraf out:
2262248Sraf lmutex_lock(&__aio_initlock);
2272248Sraf if (ret == 0)
2282248Sraf __uaio_ok = 1;
2292248Sraf __aio_initbusy = 0;
2302248Sraf (void) cond_broadcast(&__aio_initcv);
2312248Sraf lmutex_unlock(&__aio_initlock);
2322248Sraf return (ret);
2332248Sraf }
2342248Sraf
2352248Sraf /*
2362248Sraf * Called from close() before actually performing the real _close().
2372248Sraf */
2382248Sraf void
_aio_close(int fd)2392248Sraf _aio_close(int fd)
2402248Sraf {
2412248Sraf if (fd < 0) /* avoid cancelling everything */
2422248Sraf return;
2432248Sraf /*
2442248Sraf * Cancel all outstanding aio requests for this file descriptor.
2452248Sraf */
2462248Sraf if (__uaio_ok)
2472248Sraf (void) aiocancel_all(fd);
2482248Sraf /*
2492248Sraf * If we have allocated the bit array, clear the bit for this file.
2502248Sraf * The next open may re-use this file descriptor and the new file
2512248Sraf * may have different kaio() behaviour.
2522248Sraf */
2532248Sraf if (_kaio_supported != NULL)
2542248Sraf CLEAR_KAIO_SUPPORTED(fd);
2552248Sraf }
2562248Sraf
2572248Sraf /*
2582248Sraf * special kaio cleanup thread sits in a loop in the
2592248Sraf * kernel waiting for pending kaio requests to complete.
2602248Sraf */
2612248Sraf void *
_kaio_cleanup_thread(void * arg)2622248Sraf _kaio_cleanup_thread(void *arg)
2632248Sraf {
2642248Sraf if (pthread_setspecific(_aio_key, arg) != 0)
2652248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
2662248Sraf (void) _kaio(AIOSTART);
2672248Sraf return (arg);
2682248Sraf }
2692248Sraf
2702248Sraf /*
2712248Sraf * initialize kaio.
2722248Sraf */
2732248Sraf void
_kaio_init()2742248Sraf _kaio_init()
2752248Sraf {
2762248Sraf int error;
2772248Sraf sigset_t oset;
2785891Sraf int cancel_state;
2792248Sraf
2802248Sraf lmutex_lock(&__aio_initlock);
2815891Sraf (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
2822248Sraf while (__aio_initbusy)
2835891Sraf (void) cond_wait(&__aio_initcv, &__aio_initlock);
2845891Sraf (void) pthread_setcancelstate(cancel_state, NULL);
2852248Sraf if (_kaio_ok) { /* already initialized */
2862248Sraf lmutex_unlock(&__aio_initlock);
2872248Sraf return;
2882248Sraf }
2892248Sraf __aio_initbusy = 1;
2902248Sraf lmutex_unlock(&__aio_initlock);
2912248Sraf
2922248Sraf if (_kaio_supported_init() != 0)
2932248Sraf error = ENOMEM;
2942248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL)
2952248Sraf error = ENOMEM;
2962248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) {
2972248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2982248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
2992248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
3002248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
3012248Sraf }
3022248Sraf if (error && _kaiowp != NULL) {
3032248Sraf _aio_worker_free(_kaiowp);
3042248Sraf _kaiowp = NULL;
3052248Sraf }
3062248Sraf
3072248Sraf lmutex_lock(&__aio_initlock);
3082248Sraf if (error)
3092248Sraf _kaio_ok = -1;
3102248Sraf else
3112248Sraf _kaio_ok = 1;
3122248Sraf __aio_initbusy = 0;
3132248Sraf (void) cond_broadcast(&__aio_initcv);
3142248Sraf lmutex_unlock(&__aio_initlock);
3152248Sraf }
3162248Sraf
3172248Sraf int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)3182248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
3192248Sraf aio_result_t *resultp)
3202248Sraf {
3212248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
3222248Sraf }
3232248Sraf
3242248Sraf int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)3252248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
3262248Sraf aio_result_t *resultp)
3272248Sraf {
3282248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
3292248Sraf }
3302248Sraf
3312248Sraf #if !defined(_LP64)
3322248Sraf int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)3332248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
3342248Sraf aio_result_t *resultp)
3352248Sraf {
3362248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
3372248Sraf }
3382248Sraf
3392248Sraf int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)3402248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
3412248Sraf aio_result_t *resultp)
3422248Sraf {
3432248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
3442248Sraf }
3452248Sraf #endif /* !defined(_LP64) */
3462248Sraf
3472248Sraf int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)3482248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
3492248Sraf aio_result_t *resultp, int mode)
3502248Sraf {
3512248Sraf aio_req_t *reqp;
3522248Sraf aio_args_t *ap;
3532248Sraf offset_t loffset;
3545535Spraks struct stat64 stat64;
3552248Sraf int error = 0;
3562248Sraf int kerr;
3572248Sraf int umode;
3582248Sraf
3592248Sraf switch (whence) {
3602248Sraf
3612248Sraf case SEEK_SET:
3622248Sraf loffset = offset;
3632248Sraf break;
3642248Sraf case SEEK_CUR:
3652248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
3662248Sraf error = -1;
3672248Sraf else
3682248Sraf loffset += offset;
3692248Sraf break;
3702248Sraf case SEEK_END:
3715535Spraks if (fstat64(fd, &stat64) == -1)
3722248Sraf error = -1;
3732248Sraf else
3745535Spraks loffset = offset + stat64.st_size;
3752248Sraf break;
3762248Sraf default:
3772248Sraf errno = EINVAL;
3782248Sraf error = -1;
3792248Sraf }
3802248Sraf
3812248Sraf if (error)
3822248Sraf return (error);
3832248Sraf
3842248Sraf /* initialize kaio */
3852248Sraf if (!_kaio_ok)
3862248Sraf _kaio_init();
3872248Sraf
3882248Sraf /*
3892248Sraf * _aio_do_request() needs the original request code (mode) to be able
3902248Sraf * to choose the appropiate 32/64 bit function. All other functions
3912248Sraf * only require the difference between READ and WRITE (umode).
3922248Sraf */
3932248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64)
3942248Sraf umode = mode - AIOAREAD64;
3952248Sraf else
3962248Sraf umode = mode;
3972248Sraf
3982248Sraf /*
3992248Sraf * Try kernel aio first.
4002248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
4012248Sraf */
4022248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
4032248Sraf resultp->aio_errno = 0;
4042248Sraf sig_mutex_lock(&__aio_mutex);
4052248Sraf _kaio_outstand_cnt++;
4065535Spraks sig_mutex_unlock(&__aio_mutex);
4072248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
4082248Sraf (umode | AIO_POLL_BIT) : umode),
4092248Sraf fd, buf, bufsz, loffset, resultp);
4102248Sraf if (kerr == 0) {
4112248Sraf return (0);
4122248Sraf }
4135535Spraks sig_mutex_lock(&__aio_mutex);
4142248Sraf _kaio_outstand_cnt--;
4152248Sraf sig_mutex_unlock(&__aio_mutex);
4162248Sraf if (errno != ENOTSUP && errno != EBADFD)
4172248Sraf return (-1);
4182248Sraf if (errno == EBADFD)
4192248Sraf SET_KAIO_NOT_SUPPORTED(fd);
4202248Sraf }
4212248Sraf
4222248Sraf if (!__uaio_ok && __uaio_init() == -1)
4232248Sraf return (-1);
4242248Sraf
4252248Sraf if ((reqp = _aio_req_alloc()) == NULL) {
4262248Sraf errno = EAGAIN;
4272248Sraf return (-1);
4282248Sraf }
4292248Sraf
4302248Sraf /*
4312248Sraf * _aio_do_request() checks reqp->req_op to differentiate
4322248Sraf * between 32 and 64 bit access.
4332248Sraf */
4342248Sraf reqp->req_op = mode;
4352248Sraf reqp->req_resultp = resultp;
4362248Sraf ap = &reqp->req_args;
4372248Sraf ap->fd = fd;
4382248Sraf ap->buf = buf;
4392248Sraf ap->bufsz = bufsz;
4402248Sraf ap->offset = loffset;
4412248Sraf
4422248Sraf if (_aio_hash_insert(resultp, reqp) != 0) {
4432248Sraf _aio_req_free(reqp);
4442248Sraf errno = EINVAL;
4452248Sraf return (-1);
4462248Sraf }
4472248Sraf /*
4482248Sraf * _aio_req_add() only needs the difference between READ and
4492248Sraf * WRITE to choose the right worker queue.
4502248Sraf */
4512248Sraf _aio_req_add(reqp, &__nextworker_rw, umode);
4522248Sraf return (0);
4532248Sraf }
4542248Sraf
4552248Sraf int
aiocancel(aio_result_t * resultp)4562248Sraf aiocancel(aio_result_t *resultp)
4572248Sraf {
4582248Sraf aio_req_t *reqp;
4592248Sraf aio_worker_t *aiowp;
4602248Sraf int ret;
4612248Sraf int done = 0;
4622248Sraf int canceled = 0;
4632248Sraf
4642248Sraf if (!__uaio_ok) {
4652248Sraf errno = EINVAL;
4662248Sraf return (-1);
4672248Sraf }
4682248Sraf
4692248Sraf sig_mutex_lock(&__aio_mutex);
4702248Sraf reqp = _aio_hash_find(resultp);
4712248Sraf if (reqp == NULL) {
4722248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt)
4732248Sraf errno = EINVAL;
4742248Sraf else
4752248Sraf errno = EACCES;
4762248Sraf ret = -1;
4772248Sraf } else {
4782248Sraf aiowp = reqp->req_worker;
4792248Sraf sig_mutex_lock(&aiowp->work_qlock1);
4802248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
4812248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
4822248Sraf
4832248Sraf if (canceled) {
4842248Sraf ret = 0;
4852248Sraf } else {
4862248Sraf if (_aio_outstand_cnt == 0 ||
4872248Sraf _aio_outstand_cnt == _aio_req_done_cnt)
4882248Sraf errno = EINVAL;
4892248Sraf else
4902248Sraf errno = EACCES;
4912248Sraf ret = -1;
4922248Sraf }
4932248Sraf }
4942248Sraf sig_mutex_unlock(&__aio_mutex);
4952248Sraf return (ret);
4962248Sraf }
4972248Sraf
4985891Sraf /* ARGSUSED */
4995891Sraf static void
_aiowait_cleanup(void * arg)5005891Sraf _aiowait_cleanup(void *arg)
5015891Sraf {
5025891Sraf sig_mutex_lock(&__aio_mutex);
5035891Sraf _aiowait_flag--;
5045891Sraf sig_mutex_unlock(&__aio_mutex);
5055891Sraf }
5065891Sraf
5072248Sraf /*
5085891Sraf * This must be asynch safe and cancel safe
5092248Sraf */
5102248Sraf aio_result_t *
aiowait(struct timeval * uwait)5112248Sraf aiowait(struct timeval *uwait)
5122248Sraf {
5132248Sraf aio_result_t *uresultp;
5142248Sraf aio_result_t *kresultp;
5152248Sraf aio_result_t *resultp;
5162248Sraf int dontblock;
5172248Sraf int timedwait = 0;
5182248Sraf int kaio_errno = 0;
5192248Sraf struct timeval twait;
5202248Sraf struct timeval *wait = NULL;
5212248Sraf hrtime_t hrtend;
5222248Sraf hrtime_t hres;
5232248Sraf
5242248Sraf if (uwait) {
5252248Sraf /*
5262248Sraf * Check for a valid specified wait time.
5272248Sraf * If it is invalid, fail the call right away.
5282248Sraf */
5292248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
5302248Sraf uwait->tv_usec >= MICROSEC) {
5312248Sraf errno = EINVAL;
5322248Sraf return ((aio_result_t *)-1);
5332248Sraf }
5342248Sraf
5352248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
5362248Sraf hrtend = gethrtime() +
5374502Spraks (hrtime_t)uwait->tv_sec * NANOSEC +
5384502Spraks (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
5392248Sraf twait = *uwait;
5402248Sraf wait = &twait;
5412248Sraf timedwait++;
5422248Sraf } else {
5432248Sraf /* polling */
5442248Sraf sig_mutex_lock(&__aio_mutex);
5452248Sraf if (_kaio_outstand_cnt == 0) {
5462248Sraf kresultp = (aio_result_t *)-1;
5472248Sraf } else {
5482248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT,
5492248Sraf (struct timeval *)-1, 1);
5502248Sraf if (kresultp != (aio_result_t *)-1 &&
5512248Sraf kresultp != NULL &&
5522248Sraf kresultp != (aio_result_t *)1) {
5532248Sraf _kaio_outstand_cnt--;
5542248Sraf sig_mutex_unlock(&__aio_mutex);
5552248Sraf return (kresultp);
5562248Sraf }
5572248Sraf }
5582248Sraf uresultp = _aio_req_done();
5592248Sraf sig_mutex_unlock(&__aio_mutex);
5602248Sraf if (uresultp != NULL &&
5612248Sraf uresultp != (aio_result_t *)-1) {
5622248Sraf return (uresultp);
5632248Sraf }
5642248Sraf if (uresultp == (aio_result_t *)-1 &&
5652248Sraf kresultp == (aio_result_t *)-1) {
5662248Sraf errno = EINVAL;
5672248Sraf return ((aio_result_t *)-1);
5682248Sraf } else {
5692248Sraf return (NULL);
5702248Sraf }
5712248Sraf }
5722248Sraf }
5732248Sraf
5742248Sraf for (;;) {
5752248Sraf sig_mutex_lock(&__aio_mutex);
5762248Sraf uresultp = _aio_req_done();
5772248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
5782248Sraf sig_mutex_unlock(&__aio_mutex);
5792248Sraf resultp = uresultp;
5802248Sraf break;
5812248Sraf }
5822248Sraf _aiowait_flag++;
5832248Sraf dontblock = (uresultp == (aio_result_t *)-1);
5842248Sraf if (dontblock && _kaio_outstand_cnt == 0) {
5852248Sraf kresultp = (aio_result_t *)-1;
5862248Sraf kaio_errno = EINVAL;
5872248Sraf } else {
5882248Sraf sig_mutex_unlock(&__aio_mutex);
5895891Sraf pthread_cleanup_push(_aiowait_cleanup, NULL);
5905891Sraf _cancel_prologue();
5912248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT,
5922248Sraf wait, dontblock);
5935891Sraf _cancel_epilogue();
5945891Sraf pthread_cleanup_pop(0);
5952248Sraf sig_mutex_lock(&__aio_mutex);
5962248Sraf kaio_errno = errno;
5972248Sraf }
5982248Sraf _aiowait_flag--;
5992248Sraf sig_mutex_unlock(&__aio_mutex);
6002248Sraf if (kresultp == (aio_result_t *)1) {
6012248Sraf /* aiowait() awakened by an aionotify() */
6022248Sraf continue;
6032248Sraf } else if (kresultp != NULL &&
6042248Sraf kresultp != (aio_result_t *)-1) {
6052248Sraf resultp = kresultp;
6062248Sraf sig_mutex_lock(&__aio_mutex);
6072248Sraf _kaio_outstand_cnt--;
6082248Sraf sig_mutex_unlock(&__aio_mutex);
6092248Sraf break;
6102248Sraf } else if (kresultp == (aio_result_t *)-1 &&
6112248Sraf kaio_errno == EINVAL &&
6122248Sraf uresultp == (aio_result_t *)-1) {
6132248Sraf errno = kaio_errno;
6142248Sraf resultp = (aio_result_t *)-1;
6152248Sraf break;
6162248Sraf } else if (kresultp == (aio_result_t *)-1 &&
6172248Sraf kaio_errno == EINTR) {
6182248Sraf errno = kaio_errno;
6192248Sraf resultp = (aio_result_t *)-1;
6202248Sraf break;
6212248Sraf } else if (timedwait) {
6222248Sraf hres = hrtend - gethrtime();
6232248Sraf if (hres <= 0) {
6242248Sraf /* time is up; return */
6252248Sraf resultp = NULL;
6262248Sraf break;
6272248Sraf } else {
6282248Sraf /*
6292248Sraf * Some time left. Round up the remaining time
6302248Sraf * in nanoseconds to microsec. Retry the call.
6312248Sraf */
6322248Sraf hres += (NANOSEC / MICROSEC) - 1;
6332248Sraf wait->tv_sec = hres / NANOSEC;
6342248Sraf wait->tv_usec =
6354502Spraks (hres % NANOSEC) / (NANOSEC / MICROSEC);
6362248Sraf }
6372248Sraf } else {
6382248Sraf ASSERT(kresultp == NULL && uresultp == NULL);
6392248Sraf resultp = NULL;
6402248Sraf continue;
6412248Sraf }
6422248Sraf }
6432248Sraf return (resultp);
6442248Sraf }
6452248Sraf
6462248Sraf /*
6472248Sraf * _aio_get_timedelta calculates the remaining time and stores the result
6482248Sraf * into timespec_t *wait.
6492248Sraf */
6502248Sraf
6512248Sraf int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)6522248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait)
6532248Sraf {
6542248Sraf int ret = 0;
6552248Sraf struct timeval cur;
6562248Sraf timespec_t curtime;
6572248Sraf
6582248Sraf (void) gettimeofday(&cur, NULL);
6592248Sraf curtime.tv_sec = cur.tv_sec;
6602248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */
6612248Sraf
6622248Sraf if (end->tv_sec >= curtime.tv_sec) {
6632248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec;
6642248Sraf if (end->tv_nsec >= curtime.tv_nsec) {
6652248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
6662248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0)
6672248Sraf ret = -1; /* timer expired */
6682248Sraf } else {
6692248Sraf if (end->tv_sec > curtime.tv_sec) {
6702248Sraf wait->tv_sec -= 1;
6712248Sraf wait->tv_nsec = NANOSEC -
6722248Sraf (curtime.tv_nsec - end->tv_nsec);
6732248Sraf } else {
6742248Sraf ret = -1; /* timer expired */
6752248Sraf }
6762248Sraf }
6772248Sraf } else {
6782248Sraf ret = -1;
6792248Sraf }
6802248Sraf return (ret);
6812248Sraf }
6822248Sraf
6832248Sraf /*
6842248Sraf * If closing by file descriptor: we will simply cancel all the outstanding
6852248Sraf * aio`s and return. Those aio's in question will have either noticed the
6862248Sraf * cancellation notice before, during, or after initiating io.
6872248Sraf */
6882248Sraf int
aiocancel_all(int fd)6892248Sraf aiocancel_all(int fd)
6902248Sraf {
6912248Sraf aio_req_t *reqp;
692*7025Spraks aio_req_t **reqpp, *last;
6932248Sraf aio_worker_t *first;
6942248Sraf aio_worker_t *next;
6952248Sraf int canceled = 0;
6962248Sraf int done = 0;
6972248Sraf int cancelall = 0;
6982248Sraf
6992248Sraf sig_mutex_lock(&__aio_mutex);
7002248Sraf
7012248Sraf if (_aio_outstand_cnt == 0) {
7022248Sraf sig_mutex_unlock(&__aio_mutex);
7032248Sraf return (AIO_ALLDONE);
7042248Sraf }
7052248Sraf
7062248Sraf /*
7072248Sraf * Cancel requests from the read/write workers' queues.
7082248Sraf */
7092248Sraf first = __nextworker_rw;
7102248Sraf next = first;
7112248Sraf do {
7122248Sraf _aio_cancel_work(next, fd, &canceled, &done);
7132248Sraf } while ((next = next->work_forw) != first);
7142248Sraf
7152248Sraf /*
7162248Sraf * finally, check if there are requests on the done queue that
7172248Sraf * should be canceled.
7182248Sraf */
7192248Sraf if (fd < 0)
7202248Sraf cancelall = 1;
7212248Sraf reqpp = &_aio_done_tail;
722*7025Spraks last = _aio_done_tail;
7232248Sraf while ((reqp = *reqpp) != NULL) {
7242248Sraf if (cancelall || reqp->req_args.fd == fd) {
7252248Sraf *reqpp = reqp->req_next;
726*7025Spraks if (last == reqp) {
727*7025Spraks last = reqp->req_next;
728*7025Spraks }
729*7025Spraks if (_aio_done_head == reqp) {
730*7025Spraks /* this should be the last req in list */
731*7025Spraks _aio_done_head = last;
732*7025Spraks }
7332248Sraf _aio_donecnt--;
734*7025Spraks _aio_set_result(reqp, -1, ECANCELED);
7352248Sraf (void) _aio_hash_del(reqp->req_resultp);
7362248Sraf _aio_req_free(reqp);
737*7025Spraks } else {
7382248Sraf reqpp = &reqp->req_next;
739*7025Spraks last = reqp;
740*7025Spraks }
7412248Sraf }
742*7025Spraks
7432248Sraf if (cancelall) {
7442248Sraf ASSERT(_aio_donecnt == 0);
7452248Sraf _aio_done_head = NULL;
7462248Sraf }
7472248Sraf sig_mutex_unlock(&__aio_mutex);
7482248Sraf
7492248Sraf if (canceled && done == 0)
7502248Sraf return (AIO_CANCELED);
7512248Sraf else if (done && canceled == 0)
7522248Sraf return (AIO_ALLDONE);
7532248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
7542248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL));
7552248Sraf return (AIO_NOTCANCELED);
7562248Sraf }
7572248Sraf
7582248Sraf /*
7592248Sraf * Cancel requests from a given work queue. If the file descriptor
7602248Sraf * parameter, fd, is non-negative, then only cancel those requests
7612248Sraf * in this queue that are to this file descriptor. If the fd
7622248Sraf * parameter is -1, then cancel all requests.
7632248Sraf */
7642248Sraf static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)7652248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
7662248Sraf {
7672248Sraf aio_req_t *reqp;
7682248Sraf
7692248Sraf sig_mutex_lock(&aiowp->work_qlock1);
7702248Sraf /*
7712248Sraf * cancel queued requests first.
7722248Sraf */
7732248Sraf reqp = aiowp->work_tail1;
7742248Sraf while (reqp != NULL) {
7752248Sraf if (fd < 0 || reqp->req_args.fd == fd) {
7762248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
7772248Sraf /*
7782248Sraf * Callers locks were dropped.
7792248Sraf * reqp is invalid; start traversing
7802248Sraf * the list from the beginning again.
7812248Sraf */
7822248Sraf reqp = aiowp->work_tail1;
7832248Sraf continue;
7842248Sraf }
7852248Sraf }
7862248Sraf reqp = reqp->req_next;
7872248Sraf }
7882248Sraf /*
7892248Sraf * Since the queued requests have been canceled, there can
7902248Sraf * only be one inprogress request that should be canceled.
7912248Sraf */
7922248Sraf if ((reqp = aiowp->work_req) != NULL &&
7932248Sraf (fd < 0 || reqp->req_args.fd == fd))
7942248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done);
7952248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
7962248Sraf }
7972248Sraf
7982248Sraf /*
7992248Sraf * Cancel a request. Return 1 if the callers locks were temporarily
8002248Sraf * dropped, otherwise return 0.
8012248Sraf */
8022248Sraf int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)8032248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
8042248Sraf {
8052248Sraf int ostate = reqp->req_state;
8062248Sraf
8072248Sraf ASSERT(MUTEX_HELD(&__aio_mutex));
8082248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
8092248Sraf if (ostate == AIO_REQ_CANCELED)
8102248Sraf return (0);
811*7025Spraks if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
812*7025Spraks aiowp->work_prev1 == reqp) {
813*7025Spraks ASSERT(aiowp->work_done1 != 0);
814*7025Spraks /*
815*7025Spraks * If not on the done queue yet, just mark it CANCELED,
816*7025Spraks * _aio_work_done() will do the necessary clean up.
817*7025Spraks * This is required to ensure that aiocancel_all() cancels
818*7025Spraks * all the outstanding requests, including this one which
819*7025Spraks * is not yet on done queue but has been marked done.
820*7025Spraks */
821*7025Spraks _aio_set_result(reqp, -1, ECANCELED);
822*7025Spraks (void) _aio_hash_del(reqp->req_resultp);
823*7025Spraks reqp->req_state = AIO_REQ_CANCELED;
824*7025Spraks (*canceled)++;
825*7025Spraks return (0);
826*7025Spraks }
827*7025Spraks
8282248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
8292248Sraf (*done)++;
8302248Sraf return (0);
8312248Sraf }
8322248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
8332248Sraf ASSERT(POSIX_AIO(reqp));
8342248Sraf /* Cancel the queued aio_fsync() request */
8352248Sraf if (!reqp->req_head->lio_canned) {
8362248Sraf reqp->req_head->lio_canned = 1;
8372248Sraf _aio_outstand_cnt--;
8382248Sraf (*canceled)++;
8392248Sraf }
8402248Sraf return (0);
8412248Sraf }
8422248Sraf reqp->req_state = AIO_REQ_CANCELED;
8432248Sraf _aio_req_del(aiowp, reqp, ostate);
8442248Sraf (void) _aio_hash_del(reqp->req_resultp);
8452248Sraf (*canceled)++;
8462248Sraf if (reqp == aiowp->work_req) {
8472248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS);
8482248Sraf /*
8492248Sraf * Set the result values now, before _aiodone() is called.
8502248Sraf * We do this because the application can expect aio_return
8512248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively,
8522248Sraf * immediately after a successful return from aiocancel()
8532248Sraf * or aio_cancel().
8542248Sraf */
8552248Sraf _aio_set_result(reqp, -1, ECANCELED);
8562248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
8572248Sraf return (0);
8582248Sraf }
8592248Sraf if (!POSIX_AIO(reqp)) {
8602248Sraf _aio_outstand_cnt--;
8612248Sraf _aio_set_result(reqp, -1, ECANCELED);
862*7025Spraks _aio_req_free(reqp);
8632248Sraf return (0);
8642248Sraf }
8652248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
8662248Sraf sig_mutex_unlock(&__aio_mutex);
8672248Sraf _aiodone(reqp, -1, ECANCELED);
8682248Sraf sig_mutex_lock(&__aio_mutex);
8692248Sraf sig_mutex_lock(&aiowp->work_qlock1);
8702248Sraf return (1);
8712248Sraf }
8722248Sraf
8732248Sraf int
_aio_create_worker(aio_req_t * reqp,int mode)8742248Sraf _aio_create_worker(aio_req_t *reqp, int mode)
8752248Sraf {
8762248Sraf aio_worker_t *aiowp, **workers, **nextworker;
8772248Sraf int *aio_workerscnt;
8782248Sraf void *(*func)(void *);
8792248Sraf sigset_t oset;
8802248Sraf int error;
8812248Sraf
8822248Sraf /*
8832248Sraf * Put the new worker thread in the right queue.
8842248Sraf */
8852248Sraf switch (mode) {
8862248Sraf case AIOREAD:
8872248Sraf case AIOWRITE:
8882248Sraf case AIOAREAD:
8892248Sraf case AIOAWRITE:
8902248Sraf #if !defined(_LP64)
8912248Sraf case AIOAREAD64:
8922248Sraf case AIOAWRITE64:
8932248Sraf #endif
8942248Sraf workers = &__workers_rw;
8952248Sraf nextworker = &__nextworker_rw;
8962248Sraf aio_workerscnt = &__rw_workerscnt;
8972248Sraf func = _aio_do_request;
8982248Sraf break;
8992248Sraf case AIONOTIFY:
9002248Sraf workers = &__workers_no;
9012248Sraf nextworker = &__nextworker_no;
9022248Sraf func = _aio_do_notify;
9032248Sraf aio_workerscnt = &__no_workerscnt;
9042248Sraf break;
9052248Sraf default:
9062248Sraf aio_panic("_aio_create_worker: invalid mode");
9072248Sraf break;
9082248Sraf }
9092248Sraf
9102248Sraf if ((aiowp = _aio_worker_alloc()) == NULL)
9112248Sraf return (-1);
9122248Sraf
9132248Sraf if (reqp) {
9142248Sraf reqp->req_state = AIO_REQ_QUEUED;
9152248Sraf reqp->req_worker = aiowp;
9162248Sraf aiowp->work_head1 = reqp;
9172248Sraf aiowp->work_tail1 = reqp;
9182248Sraf aiowp->work_next1 = reqp;
9192248Sraf aiowp->work_count1 = 1;
9202248Sraf aiowp->work_minload1 = 1;
9212248Sraf }
9222248Sraf
9232248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
9242248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
9254502Spraks THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
9262248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
9272248Sraf if (error) {
9282248Sraf if (reqp) {
9292248Sraf reqp->req_state = 0;
9302248Sraf reqp->req_worker = NULL;
9312248Sraf }
9322248Sraf _aio_worker_free(aiowp);
9332248Sraf return (-1);
9342248Sraf }
9352248Sraf
9362248Sraf lmutex_lock(&__aio_mutex);
9372248Sraf (*aio_workerscnt)++;
9382248Sraf if (*workers == NULL) {
9392248Sraf aiowp->work_forw = aiowp;
9402248Sraf aiowp->work_backw = aiowp;
9412248Sraf *nextworker = aiowp;
9422248Sraf *workers = aiowp;
9432248Sraf } else {
9442248Sraf aiowp->work_backw = (*workers)->work_backw;
9452248Sraf aiowp->work_forw = (*workers);
9462248Sraf (*workers)->work_backw->work_forw = aiowp;
9472248Sraf (*workers)->work_backw = aiowp;
9482248Sraf }
9492248Sraf _aio_worker_cnt++;
9502248Sraf lmutex_unlock(&__aio_mutex);
9512248Sraf
9522248Sraf (void) thr_continue(aiowp->work_tid);
9532248Sraf
9542248Sraf return (0);
9552248Sraf }
9562248Sraf
9572248Sraf /*
9582248Sraf * This is the worker's main routine.
9592248Sraf * The task of this function is to execute all queued requests;
9602248Sraf * once the last pending request is executed this function will block
9612248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to
9622248Sraf * restart the work.
9632248Sraf * Every worker has an own work queue. The queue lock is required
9642248Sraf * to synchronize the addition of new requests for this worker or
9652248Sraf * cancellation of pending/running requests.
9662248Sraf *
9672248Sraf * Cancellation scenarios:
9682248Sraf * The cancellation of a request is being done asynchronously using
9692248Sraf * _aio_cancel_req() from another thread context.
9702248Sraf * A queued request can be cancelled in different manners :
9712248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
9722248Sraf * - lock the queue -> remove the request -> unlock the queue
9732248Sraf * - this function/thread does not detect this cancellation process
9742248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) :
9752248Sraf * - this function first allow the cancellation of the running
9762248Sraf * request with the flag "work_cancel_flg=1"
9772248Sraf * see _aio_req_get() -> _aio_cancel_on()
9782248Sraf * During this phase, it is allowed to interrupt the worker
9792248Sraf * thread running the request (this thread) using the SIGAIOCANCEL
9802248Sraf * signal.
9812248Sraf * Once this thread returns from the kernel (because the request
9822248Sraf * is just done), then it must disable a possible cancellation
9832248Sraf * and proceed to finish the request. To disable the cancellation
9842248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
9852248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
9862248Sraf * same procedure as in a)
9872248Sraf *
9882248Sraf * To b)
9892248Sraf * This thread uses sigsetjmp() to define the position in the code, where
9902248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal
9912248Sraf * is detected.
9922248Sraf * Normally this thread should get the cancellation signal during the
9932248Sraf * kernel phase (reading or writing). In that case the signal handler
9942248Sraf * aiosigcancelhndlr() is activated using the worker thread context,
9952248Sraf * which again will use the siglongjmp() function to break the standard
9962248Sraf * code flow and jump to the "sigsetjmp" position, provided that
9972248Sraf * "work_cancel_flg" is set to "1".
9982248Sraf * Because the "work_cancel_flg" is only manipulated by this worker
9992248Sraf * thread and it can only run on one CPU at a given time, it is not
10002248Sraf * necessary to protect that flag with the queue lock.
10012248Sraf * Returning from the kernel (read or write system call) we must
10022248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly
10032248Sraf * the use of the siglongjmp() function to prevent a possible deadlock:
10042248Sraf * - It can happens that this worker thread returns from the kernel and
10052248Sraf * blocks in "work_qlock1",
10062248Sraf * - then a second thread cancels the apparently "in progress" request
10072248Sraf * and sends the SIGAIOCANCEL signal to the worker thread,
10082248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns
10092248Sraf * from the kernel,
10102248Sraf * - the kernel detects the pending signal and activates the signal
10112248Sraf * handler instead,
10122248Sraf * - if the "work_cancel_flg" is still set then the signal handler
10132248Sraf * should use siglongjmp() to cancel the "in progress" request and
10142248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get()
10152248Sraf * for a second time => deadlock.
10162248Sraf * To avoid that situation we disable the cancellation of the request
10172248Sraf * in progress BEFORE we try to acquire the work_qlock1.
10182248Sraf * In that case the signal handler will not call siglongjmp() and the
10192248Sraf * worker thread will continue running the standard code flow.
10202248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate
10212248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and
10222248Sraf * avoiding a deadlock.
10232248Sraf */
10242248Sraf void *
_aio_do_request(void * arglist)10252248Sraf _aio_do_request(void *arglist)
10262248Sraf {
10272248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist;
10282248Sraf ulwp_t *self = curthread;
10292248Sraf struct aio_args *arg;
10302248Sraf aio_req_t *reqp; /* current AIO request */
10312248Sraf ssize_t retval;
10325937Sraf int append;
10332248Sraf int error;
10342248Sraf
10352248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0)
10362248Sraf aio_panic("_aio_do_request, pthread_setspecific()");
10372248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
10382248Sraf ASSERT(aiowp->work_req == NULL);
10392248Sraf
10402248Sraf /*
10412248Sraf * We resume here when an operation is cancelled.
10422248Sraf * On first entry, aiowp->work_req == NULL, so all
10432248Sraf * we do is block SIGAIOCANCEL.
10442248Sraf */
10452248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0);
10462248Sraf ASSERT(self->ul_sigdefer == 0);
10472248Sraf
10482248Sraf sigoff(self); /* block SIGAIOCANCEL */
10492248Sraf if (aiowp->work_req != NULL)
10502248Sraf _aio_finish_request(aiowp, -1, ECANCELED);
10512248Sraf
10522248Sraf for (;;) {
10532248Sraf /*
10542248Sraf * Put completed requests on aio_done_list. This has
10552248Sraf * to be done as part of the main loop to ensure that
10562248Sraf * we don't artificially starve any aiowait'ers.
10572248Sraf */
10582248Sraf if (aiowp->work_done1)
10592248Sraf _aio_work_done(aiowp);
10602248Sraf
10612248Sraf top:
10622248Sraf /* consume any deferred SIGAIOCANCEL signal here */
10632248Sraf sigon(self);
10642248Sraf sigoff(self);
10652248Sraf
10662248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) {
10672248Sraf if (_aio_idle(aiowp) != 0)
10682248Sraf goto top;
10692248Sraf }
10702248Sraf arg = &reqp->req_args;
10712248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
10722248Sraf reqp->req_state == AIO_REQ_CANCELED);
10732248Sraf error = 0;
10742248Sraf
10752248Sraf switch (reqp->req_op) {
10762248Sraf case AIOREAD:
10772248Sraf case AIOAREAD:
10782248Sraf sigon(self); /* unblock SIGAIOCANCEL */
10792248Sraf retval = pread(arg->fd, arg->buf,
10802248Sraf arg->bufsz, arg->offset);
10812248Sraf if (retval == -1) {
10822248Sraf if (errno == ESPIPE) {
10832248Sraf retval = read(arg->fd,
10842248Sraf arg->buf, arg->bufsz);
10852248Sraf if (retval == -1)
10862248Sraf error = errno;
10872248Sraf } else {
10882248Sraf error = errno;
10892248Sraf }
10902248Sraf }
10912248Sraf sigoff(self); /* block SIGAIOCANCEL */
10922248Sraf break;
10932248Sraf case AIOWRITE:
10942248Sraf case AIOAWRITE:
10955937Sraf /*
10965937Sraf * The SUSv3 POSIX spec for aio_write() states:
10975937Sraf * If O_APPEND is set for the file descriptor,
10985937Sraf * write operations append to the file in the
10995937Sraf * same order as the calls were made.
11005937Sraf * but, somewhat inconsistently, it requires pwrite()
11015937Sraf * to ignore the O_APPEND setting. So we have to use
11025937Sraf * fcntl() to get the open modes and call write() for
11035937Sraf * the O_APPEND case.
11045937Sraf */
11055937Sraf append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
11062248Sraf sigon(self); /* unblock SIGAIOCANCEL */
11075937Sraf retval = append?
11085937Sraf write(arg->fd, arg->buf, arg->bufsz) :
11095937Sraf pwrite(arg->fd, arg->buf, arg->bufsz,
11105937Sraf arg->offset);
11112248Sraf if (retval == -1) {
11122248Sraf if (errno == ESPIPE) {
11132248Sraf retval = write(arg->fd,
11142248Sraf arg->buf, arg->bufsz);
11152248Sraf if (retval == -1)
11162248Sraf error = errno;
11172248Sraf } else {
11182248Sraf error = errno;
11192248Sraf }
11202248Sraf }
11212248Sraf sigoff(self); /* block SIGAIOCANCEL */
11222248Sraf break;
11232248Sraf #if !defined(_LP64)
11242248Sraf case AIOAREAD64:
11252248Sraf sigon(self); /* unblock SIGAIOCANCEL */
11262248Sraf retval = pread64(arg->fd, arg->buf,
11272248Sraf arg->bufsz, arg->offset);
11282248Sraf if (retval == -1) {
11292248Sraf if (errno == ESPIPE) {
11302248Sraf retval = read(arg->fd,
11312248Sraf arg->buf, arg->bufsz);
11322248Sraf if (retval == -1)
11332248Sraf error = errno;
11342248Sraf } else {
11352248Sraf error = errno;
11362248Sraf }
11372248Sraf }
11382248Sraf sigoff(self); /* block SIGAIOCANCEL */
11392248Sraf break;
11402248Sraf case AIOAWRITE64:
11415937Sraf /*
11425937Sraf * The SUSv3 POSIX spec for aio_write() states:
11435937Sraf * If O_APPEND is set for the file descriptor,
11445937Sraf * write operations append to the file in the
11455937Sraf * same order as the calls were made.
11465937Sraf * but, somewhat inconsistently, it requires pwrite()
11475937Sraf * to ignore the O_APPEND setting. So we have to use
11485937Sraf * fcntl() to get the open modes and call write() for
11495937Sraf * the O_APPEND case.
11505937Sraf */
11515937Sraf append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
11522248Sraf sigon(self); /* unblock SIGAIOCANCEL */
11535937Sraf retval = append?
11545937Sraf write(arg->fd, arg->buf, arg->bufsz) :
11555937Sraf pwrite64(arg->fd, arg->buf, arg->bufsz,
11565937Sraf arg->offset);
11572248Sraf if (retval == -1) {
11582248Sraf if (errno == ESPIPE) {
11592248Sraf retval = write(arg->fd,
11602248Sraf arg->buf, arg->bufsz);
11612248Sraf if (retval == -1)
11622248Sraf error = errno;
11632248Sraf } else {
11642248Sraf error = errno;
11652248Sraf }
11662248Sraf }
11672248Sraf sigoff(self); /* block SIGAIOCANCEL */
11682248Sraf break;
11692248Sraf #endif /* !defined(_LP64) */
11702248Sraf case AIOFSYNC:
11712248Sraf if (_aio_fsync_del(aiowp, reqp))
11722248Sraf goto top;
11732248Sraf ASSERT(reqp->req_head == NULL);
11742248Sraf /*
11752248Sraf * All writes for this fsync request are now
11762248Sraf * acknowledged. Now make these writes visible
11772248Sraf * and put the final request into the hash table.
11782248Sraf */
11792248Sraf if (reqp->req_state == AIO_REQ_CANCELED) {
11802248Sraf /* EMPTY */;
11812248Sraf } else if (arg->offset == O_SYNC) {
11822248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
11832248Sraf error = errno;
11842248Sraf } else {
11852248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
11862248Sraf error = errno;
11872248Sraf }
11882248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
11892248Sraf aio_panic("_aio_do_request(): AIOFSYNC: "
11902248Sraf "request already in hash table");
11912248Sraf break;
11922248Sraf default:
11932248Sraf aio_panic("_aio_do_request, bad op");
11942248Sraf }
11952248Sraf
11962248Sraf _aio_finish_request(aiowp, retval, error);
11972248Sraf }
11982248Sraf /* NOTREACHED */
11992248Sraf return (NULL);
12002248Sraf }
12012248Sraf
12022248Sraf /*
12032248Sraf * Perform the tail processing for _aio_do_request().
12042248Sraf * The in-progress request may or may not have been cancelled.
12052248Sraf */
12062248Sraf static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)12072248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
12082248Sraf {
12092248Sraf aio_req_t *reqp;
12102248Sraf
12112248Sraf sig_mutex_lock(&aiowp->work_qlock1);
12122248Sraf if ((reqp = aiowp->work_req) == NULL)
12132248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
12142248Sraf else {
12152248Sraf aiowp->work_req = NULL;
12162248Sraf if (reqp->req_state == AIO_REQ_CANCELED) {
12172248Sraf retval = -1;
12182248Sraf error = ECANCELED;
12192248Sraf }
12202248Sraf if (!POSIX_AIO(reqp)) {
12214502Spraks int notify;
1222*7025Spraks if (reqp->req_state == AIO_REQ_INPROGRESS) {
1223*7025Spraks reqp->req_state = AIO_REQ_DONE;
1224*7025Spraks _aio_set_result(reqp, retval, error);
1225*7025Spraks }
12262248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
12272248Sraf sig_mutex_lock(&__aio_mutex);
12284502Spraks /*
12294502Spraks * If it was canceled, this request will not be
12304502Spraks * added to done list. Just free it.
12314502Spraks */
12324502Spraks if (error == ECANCELED) {
12332248Sraf _aio_outstand_cnt--;
12344502Spraks _aio_req_free(reqp);
12354502Spraks } else {
12364502Spraks _aio_req_done_cnt++;
12374502Spraks }
12384502Spraks /*
12394502Spraks * Notify any thread that may have blocked
12404502Spraks * because it saw an outstanding request.
12414502Spraks */
12424502Spraks notify = 0;
12434502Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) {
12444502Spraks notify = 1;
12454502Spraks }
12462248Sraf sig_mutex_unlock(&__aio_mutex);
12474502Spraks if (notify) {
12484502Spraks (void) _kaio(AIONOTIFY);
12494502Spraks }
12502248Sraf } else {
12512248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS)
12522248Sraf reqp->req_state = AIO_REQ_DONE;
12532248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
12542248Sraf _aiodone(reqp, retval, error);
12552248Sraf }
12562248Sraf }
12572248Sraf }
12582248Sraf
12592248Sraf void
_aio_req_mark_done(aio_req_t * reqp)12602248Sraf _aio_req_mark_done(aio_req_t *reqp)
12612248Sraf {
12622248Sraf #if !defined(_LP64)
12632248Sraf if (reqp->req_largefile)
12642248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
12652248Sraf else
12662248Sraf #endif
12672248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
12682248Sraf }
12692248Sraf
12702248Sraf /*
12712248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
12722248Sraf * hopefully to consume one of our queued signals.
12732248Sraf */
12742248Sraf static void
_aio_delay(int ticks)12752248Sraf _aio_delay(int ticks)
12762248Sraf {
12772248Sraf (void) usleep(ticks * (MICROSEC / hz));
12782248Sraf }
12792248Sraf
12802248Sraf /*
12812248Sraf * Actually send the notifications.
12822248Sraf * We could block indefinitely here if the application
12832248Sraf * is not listening for the signal or port notifications.
12842248Sraf */
12852248Sraf static void
send_notification(notif_param_t * npp)12862248Sraf send_notification(notif_param_t *npp)
12872248Sraf {
12882248Sraf extern int __sigqueue(pid_t pid, int signo,
12894502Spraks /* const union sigval */ void *value, int si_code, int block);
12902248Sraf
12912248Sraf if (npp->np_signo)
12922248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user,
12932248Sraf SI_ASYNCIO, 1);
12942248Sraf else if (npp->np_port >= 0)
12952248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
12962248Sraf npp->np_event, npp->np_object, npp->np_user);
12972248Sraf
12982248Sraf if (npp->np_lio_signo)
12992248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
13002248Sraf SI_ASYNCIO, 1);
13012248Sraf else if (npp->np_lio_port >= 0)
13022248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
13032248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
13042248Sraf }
13052248Sraf
13062248Sraf /*
13072248Sraf * Asynchronous notification worker.
13082248Sraf */
13092248Sraf void *
_aio_do_notify(void * arg)13102248Sraf _aio_do_notify(void *arg)
13112248Sraf {
13122248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg;
13132248Sraf aio_req_t *reqp;
13142248Sraf
13152248Sraf /*
13162248Sraf * This isn't really necessary. All signals are blocked.
13172248Sraf */
13182248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0)
13192248Sraf aio_panic("_aio_do_notify, pthread_setspecific()");
13202248Sraf
13212248Sraf /*
13222248Sraf * Notifications are never cancelled.
13232248Sraf * All signals remain blocked, forever.
13242248Sraf */
13252248Sraf for (;;) {
13262248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) {
13272248Sraf if (_aio_idle(aiowp) != 0)
13282248Sraf aio_panic("_aio_do_notify: _aio_idle() failed");
13292248Sraf }
13302248Sraf send_notification(&reqp->req_notify);
13312248Sraf _aio_req_free(reqp);
13322248Sraf }
13332248Sraf
13342248Sraf /* NOTREACHED */
13352248Sraf return (NULL);
13362248Sraf }
13372248Sraf
13382248Sraf /*
13392248Sraf * Do the completion semantics for a request that was either canceled
13402248Sraf * by _aio_cancel_req() or was completed by _aio_do_request().
13412248Sraf */
13422248Sraf static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)13432248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error)
13442248Sraf {
13452248Sraf aio_result_t *resultp = reqp->req_resultp;
13462248Sraf int notify = 0;
13472248Sraf aio_lio_t *head;
13482248Sraf int sigev_none;
13492248Sraf int sigev_signal;
13502248Sraf int sigev_thread;
13512248Sraf int sigev_port;
13522248Sraf notif_param_t np;
13532248Sraf
13542248Sraf /*
13552248Sraf * We call _aiodone() only for Posix I/O.
13562248Sraf */
13572248Sraf ASSERT(POSIX_AIO(reqp));
13582248Sraf
13592248Sraf sigev_none = 0;
13602248Sraf sigev_signal = 0;
13612248Sraf sigev_thread = 0;
13622248Sraf sigev_port = 0;
13632248Sraf np.np_signo = 0;
13642248Sraf np.np_port = -1;
13652248Sraf np.np_lio_signo = 0;
13662248Sraf np.np_lio_port = -1;
13672248Sraf
13682248Sraf switch (reqp->req_sigevent.sigev_notify) {
13692248Sraf case SIGEV_NONE:
13702248Sraf sigev_none = 1;
13712248Sraf break;
13722248Sraf case SIGEV_SIGNAL:
13732248Sraf sigev_signal = 1;
13742248Sraf break;
13752248Sraf case SIGEV_THREAD:
13762248Sraf sigev_thread = 1;
13772248Sraf break;
13782248Sraf case SIGEV_PORT:
13792248Sraf sigev_port = 1;
13802248Sraf break;
13812248Sraf default:
13822248Sraf aio_panic("_aiodone: improper sigev_notify");
13832248Sraf break;
13842248Sraf }
13852248Sraf
13862248Sraf /*
13872248Sraf * Figure out the notification parameters while holding __aio_mutex.
13882248Sraf * Actually perform the notifications after dropping __aio_mutex.
13892248Sraf * This allows us to sleep for a long time (if the notifications
13902248Sraf * incur delays) without impeding other async I/O operations.
13912248Sraf */
13922248Sraf
13932248Sraf sig_mutex_lock(&__aio_mutex);
13942248Sraf
13952248Sraf if (sigev_signal) {
13962248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
13972248Sraf notify = 1;
13982248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
13992248Sraf } else if (sigev_thread | sigev_port) {
14002248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
14012248Sraf notify = 1;
14022248Sraf np.np_event = reqp->req_op;
14032248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile)
14042248Sraf np.np_event = AIOFSYNC64;
14052248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp;
14062248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
14072248Sraf }
14082248Sraf
14092248Sraf if (resultp->aio_errno == EINPROGRESS)
14102248Sraf _aio_set_result(reqp, retval, error);
14112248Sraf
14122248Sraf _aio_outstand_cnt--;
14132248Sraf
14142248Sraf head = reqp->req_head;
14152248Sraf reqp->req_head = NULL;
14162248Sraf
14172248Sraf if (sigev_none) {
14182248Sraf _aio_enq_doneq(reqp);
14192248Sraf reqp = NULL;
14202248Sraf } else {
14212248Sraf (void) _aio_hash_del(resultp);
14222248Sraf _aio_req_mark_done(reqp);
14232248Sraf }
14242248Sraf
14252248Sraf _aio_waitn_wakeup();
14262248Sraf
14272248Sraf /*
14282248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and
14292248Sraf * __aio_suspend() increments "_aio_kernel_suspend"
14302248Sraf * when they are waiting in the kernel for completed I/Os.
14312248Sraf *
14322248Sraf * _kaio(AIONOTIFY) awakes the corresponding function
14332248Sraf * in the kernel; then the corresponding __aio_waitn() or
14342248Sraf * __aio_suspend() function could reap the recently
14352248Sraf * completed I/Os (_aiodone()).
14362248Sraf */
14372248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
14382248Sraf (void) _kaio(AIONOTIFY);
14392248Sraf
14402248Sraf sig_mutex_unlock(&__aio_mutex);
14412248Sraf
14422248Sraf if (head != NULL) {
14432248Sraf /*
14442248Sraf * If all the lio requests have completed,
14452248Sraf * prepare to notify the waiting thread.
14462248Sraf */
14472248Sraf sig_mutex_lock(&head->lio_mutex);
14482248Sraf ASSERT(head->lio_refcnt == head->lio_nent);
14492248Sraf if (head->lio_refcnt == 1) {
14502248Sraf int waiting = 0;
14512248Sraf if (head->lio_mode == LIO_WAIT) {
14522248Sraf if ((waiting = head->lio_waiting) != 0)
14532248Sraf (void) cond_signal(&head->lio_cond_cv);
14542248Sraf } else if (head->lio_port < 0) { /* none or signal */
14552248Sraf if ((np.np_lio_signo = head->lio_signo) != 0)
14562248Sraf notify = 1;
14572248Sraf np.np_lio_user = head->lio_sigval.sival_ptr;
14582248Sraf } else { /* thread or port */
14592248Sraf notify = 1;
14602248Sraf np.np_lio_port = head->lio_port;
14612248Sraf np.np_lio_event = head->lio_event;
14622248Sraf np.np_lio_object =
14632248Sraf (uintptr_t)head->lio_sigevent;
14642248Sraf np.np_lio_user = head->lio_sigval.sival_ptr;
14652248Sraf }
14662248Sraf head->lio_nent = head->lio_refcnt = 0;
14672248Sraf sig_mutex_unlock(&head->lio_mutex);
14682248Sraf if (waiting == 0)
14692248Sraf _aio_lio_free(head);
14702248Sraf } else {
14712248Sraf head->lio_nent--;
14722248Sraf head->lio_refcnt--;
14732248Sraf sig_mutex_unlock(&head->lio_mutex);
14742248Sraf }
14752248Sraf }
14762248Sraf
14772248Sraf /*
14782248Sraf * The request is completed; now perform the notifications.
14792248Sraf */
14802248Sraf if (notify) {
14812248Sraf if (reqp != NULL) {
14822248Sraf /*
14832248Sraf * We usually put the request on the notification
14842248Sraf * queue because we don't want to block and delay
14852248Sraf * other operations behind us in the work queue.
14862248Sraf * Also we must never block on a cancel notification
14872248Sraf * because we are being called from an application
14882248Sraf * thread in this case and that could lead to deadlock
14892248Sraf * if no other thread is receiving notificatins.
14902248Sraf */
14912248Sraf reqp->req_notify = np;
14922248Sraf reqp->req_op = AIONOTIFY;
14932248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY);
14942248Sraf reqp = NULL;
14952248Sraf } else {
14962248Sraf /*
14972248Sraf * We already put the request on the done queue,
14982248Sraf * so we can't queue it to the notification queue.
14992248Sraf * Just do the notification directly.
15002248Sraf */
15012248Sraf send_notification(&np);
15022248Sraf }
15032248Sraf }
15042248Sraf
15052248Sraf if (reqp != NULL)
15062248Sraf _aio_req_free(reqp);
15072248Sraf }
15082248Sraf
15092248Sraf /*
15102248Sraf * Delete fsync requests from list head until there is
15112248Sraf * only one left. Return 0 when there is only one,
15122248Sraf * otherwise return a non-zero value.
15132248Sraf */
15142248Sraf static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)15152248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
15162248Sraf {
15172248Sraf aio_lio_t *head = reqp->req_head;
15182248Sraf int rval = 0;
15192248Sraf
15202248Sraf ASSERT(reqp == aiowp->work_req);
15212248Sraf sig_mutex_lock(&aiowp->work_qlock1);
15222248Sraf sig_mutex_lock(&head->lio_mutex);
15232248Sraf if (head->lio_refcnt > 1) {
15242248Sraf head->lio_refcnt--;
15252248Sraf head->lio_nent--;
15262248Sraf aiowp->work_req = NULL;
15272248Sraf sig_mutex_unlock(&head->lio_mutex);
15282248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
15292248Sraf sig_mutex_lock(&__aio_mutex);
15302248Sraf _aio_outstand_cnt--;
15312248Sraf _aio_waitn_wakeup();
15322248Sraf sig_mutex_unlock(&__aio_mutex);
15332248Sraf _aio_req_free(reqp);
15342248Sraf return (1);
15352248Sraf }
15362248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
15372248Sraf reqp->req_head = NULL;
15382248Sraf if (head->lio_canned)
15392248Sraf reqp->req_state = AIO_REQ_CANCELED;
15402248Sraf if (head->lio_mode == LIO_DESTROY) {
15412248Sraf aiowp->work_req = NULL;
15422248Sraf rval = 1;
15432248Sraf }
15442248Sraf sig_mutex_unlock(&head->lio_mutex);
15452248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
15462248Sraf head->lio_refcnt--;
15472248Sraf head->lio_nent--;
15482248Sraf _aio_lio_free(head);
15492248Sraf if (rval != 0)
15502248Sraf _aio_req_free(reqp);
15512248Sraf return (rval);
15522248Sraf }
15532248Sraf
15542248Sraf /*
15552248Sraf * A worker is set idle when its work queue is empty.
15562248Sraf * The worker checks again that it has no more work
15572248Sraf * and then goes to sleep waiting for more work.
15582248Sraf */
15592248Sraf int
_aio_idle(aio_worker_t * aiowp)15602248Sraf _aio_idle(aio_worker_t *aiowp)
15612248Sraf {
15622248Sraf int error = 0;
15632248Sraf
15642248Sraf sig_mutex_lock(&aiowp->work_qlock1);
15652248Sraf if (aiowp->work_count1 == 0) {
15662248Sraf ASSERT(aiowp->work_minload1 == 0);
15672248Sraf aiowp->work_idleflg = 1;
15682248Sraf /*
15692248Sraf * A cancellation handler is not needed here.
15702248Sraf * aio worker threads are never cancelled via pthread_cancel().
15712248Sraf */
15722248Sraf error = sig_cond_wait(&aiowp->work_idle_cv,
15732248Sraf &aiowp->work_qlock1);
15742248Sraf /*
15752248Sraf * The idle flag is normally cleared before worker is awakened
15762248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself.
15772248Sraf */
15782248Sraf if (error)
15792248Sraf aiowp->work_idleflg = 0;
15802248Sraf }
15812248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
15822248Sraf return (error);
15832248Sraf }
15842248Sraf
15852248Sraf /*
15862248Sraf * A worker's completed AIO requests are placed onto a global
15872248Sraf * done queue. The application is only sent a SIGIO signal if
15882248Sraf * the process has a handler enabled and it is not waiting via
15892248Sraf * aiowait().
15902248Sraf */
15912248Sraf static void
_aio_work_done(aio_worker_t * aiowp)15922248Sraf _aio_work_done(aio_worker_t *aiowp)
15932248Sraf {
15942248Sraf aio_req_t *reqp;
15952248Sraf
1596*7025Spraks sig_mutex_lock(&__aio_mutex);
15972248Sraf sig_mutex_lock(&aiowp->work_qlock1);
15982248Sraf reqp = aiowp->work_prev1;
15992248Sraf reqp->req_next = NULL;
16002248Sraf aiowp->work_done1 = 0;
16012248Sraf aiowp->work_tail1 = aiowp->work_next1;
16022248Sraf if (aiowp->work_tail1 == NULL)
16032248Sraf aiowp->work_head1 = NULL;
16042248Sraf aiowp->work_prev1 = NULL;
16052248Sraf _aio_outstand_cnt--;
16062248Sraf _aio_req_done_cnt--;
1607*7025Spraks if (reqp->req_state == AIO_REQ_CANCELED) {
1608*7025Spraks /*
1609*7025Spraks * Request got cancelled after it was marked done. This can
1610*7025Spraks * happen because _aio_finish_request() marks it AIO_REQ_DONE
1611*7025Spraks * and drops all locks. Don't add the request to the done
1612*7025Spraks * queue and just discard it.
1613*7025Spraks */
1614*7025Spraks sig_mutex_unlock(&aiowp->work_qlock1);
1615*7025Spraks _aio_req_free(reqp);
1616*7025Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1617*7025Spraks sig_mutex_unlock(&__aio_mutex);
1618*7025Spraks (void) _kaio(AIONOTIFY);
1619*7025Spraks } else {
1620*7025Spraks sig_mutex_unlock(&__aio_mutex);
1621*7025Spraks }
1622*7025Spraks return;
1623*7025Spraks }
1624*7025Spraks sig_mutex_unlock(&aiowp->work_qlock1);
1625*7025Spraks _aio_donecnt++;
16262248Sraf ASSERT(_aio_donecnt > 0 &&
16272248Sraf _aio_outstand_cnt >= 0 &&
16282248Sraf _aio_req_done_cnt >= 0);
16292248Sraf ASSERT(reqp != NULL);
16302248Sraf
16312248Sraf if (_aio_done_tail == NULL) {
16322248Sraf _aio_done_head = _aio_done_tail = reqp;
16332248Sraf } else {
16342248Sraf _aio_done_head->req_next = reqp;
16352248Sraf _aio_done_head = reqp;
16362248Sraf }
16372248Sraf
16382248Sraf if (_aiowait_flag) {
16392248Sraf sig_mutex_unlock(&__aio_mutex);
16402248Sraf (void) _kaio(AIONOTIFY);
16412248Sraf } else {
16422248Sraf sig_mutex_unlock(&__aio_mutex);
16432248Sraf if (_sigio_enabled)
16442248Sraf (void) kill(__pid, SIGIO);
16452248Sraf }
16462248Sraf }
16472248Sraf
16482248Sraf /*
16492248Sraf * The done queue consists of AIO requests that are in either the
16502248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
16512248Sraf * are discarded. If the done queue is empty then NULL is returned.
16522248Sraf * Otherwise the address of a done aio_result_t is returned.
16532248Sraf */
16542248Sraf aio_result_t *
_aio_req_done(void)16552248Sraf _aio_req_done(void)
16562248Sraf {
16572248Sraf aio_req_t *reqp;
16582248Sraf aio_result_t *resultp;
16592248Sraf
16602248Sraf ASSERT(MUTEX_HELD(&__aio_mutex));
16612248Sraf
16622248Sraf if ((reqp = _aio_done_tail) != NULL) {
16632248Sraf if ((_aio_done_tail = reqp->req_next) == NULL)
16642248Sraf _aio_done_head = NULL;
16652248Sraf ASSERT(_aio_donecnt > 0);
16662248Sraf _aio_donecnt--;
16672248Sraf (void) _aio_hash_del(reqp->req_resultp);
16682248Sraf resultp = reqp->req_resultp;
16692248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE);
16702248Sraf _aio_req_free(reqp);
16712248Sraf return (resultp);
16722248Sraf }
16732248Sraf /* is queue empty? */
16742248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) {
16752248Sraf return ((aio_result_t *)-1);
16762248Sraf }
16772248Sraf return (NULL);
16782248Sraf }
16792248Sraf
16802248Sraf /*
16812248Sraf * Set the return and errno values for the application's use.
16822248Sraf *
16832248Sraf * For the Posix interfaces, we must set the return value first followed
16842248Sraf * by the errno value because the Posix interfaces allow for a change
16852248Sraf * in the errno value from EINPROGRESS to something else to signal
16862248Sraf * the completion of the asynchronous request.
16872248Sraf *
16882248Sraf * The opposite is true for the Solaris interfaces. These allow for
16892248Sraf * a change in the return value from AIO_INPROGRESS to something else
16902248Sraf * to signal the completion of the asynchronous request.
16912248Sraf */
16922248Sraf void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)16932248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
16942248Sraf {
16952248Sraf aio_result_t *resultp = reqp->req_resultp;
16962248Sraf
16972248Sraf if (POSIX_AIO(reqp)) {
16982248Sraf resultp->aio_return = retval;
16992248Sraf membar_producer();
17002248Sraf resultp->aio_errno = error;
17012248Sraf } else {
17022248Sraf resultp->aio_errno = error;
17032248Sraf membar_producer();
17042248Sraf resultp->aio_return = retval;
17052248Sraf }
17062248Sraf }
17072248Sraf
17082248Sraf /*
17092248Sraf * Add an AIO request onto the next work queue.
17102248Sraf * A circular list of workers is used to choose the next worker.
17112248Sraf */
17122248Sraf void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)17132248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
17142248Sraf {
17152248Sraf ulwp_t *self = curthread;
17162248Sraf aio_worker_t *aiowp;
17172248Sraf aio_worker_t *first;
17182248Sraf int load_bal_flg = 1;
17192248Sraf int found;
17202248Sraf
17212248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ);
17222248Sraf reqp->req_next = NULL;
17232248Sraf /*
17242248Sraf * Try to acquire the next worker's work queue. If it is locked,
17252248Sraf * then search the list of workers until a queue is found unlocked,
17262248Sraf * or until the list is completely traversed at which point another
17272248Sraf * worker will be created.
17282248Sraf */
17292248Sraf sigoff(self); /* defer SIGIO */
17302248Sraf sig_mutex_lock(&__aio_mutex);
17312248Sraf first = aiowp = *nextworker;
17322248Sraf if (mode != AIONOTIFY)
17332248Sraf _aio_outstand_cnt++;
17342248Sraf sig_mutex_unlock(&__aio_mutex);
17352248Sraf
17362248Sraf switch (mode) {
17372248Sraf case AIOREAD:
17382248Sraf case AIOWRITE:
17392248Sraf case AIOAREAD:
17402248Sraf case AIOAWRITE:
17412248Sraf #if !defined(_LP64)
17422248Sraf case AIOAREAD64:
17432248Sraf case AIOAWRITE64:
17442248Sraf #endif
17452248Sraf /* try to find an idle worker */
17462248Sraf found = 0;
17472248Sraf do {
17482248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
17492248Sraf if (aiowp->work_idleflg) {
17502248Sraf found = 1;
17512248Sraf break;
17522248Sraf }
17532248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
17542248Sraf }
17552248Sraf } while ((aiowp = aiowp->work_forw) != first);
17562248Sraf
17572248Sraf if (found) {
17582248Sraf aiowp->work_minload1++;
17592248Sraf break;
17602248Sraf }
17612248Sraf
17622248Sraf /* try to acquire some worker's queue lock */
17632248Sraf do {
17642248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
17652248Sraf found = 1;
17662248Sraf break;
17672248Sraf }
17682248Sraf } while ((aiowp = aiowp->work_forw) != first);
17692248Sraf
17702248Sraf /*
17712248Sraf * Create more workers when the workers appear overloaded.
17722248Sraf * Either all the workers are busy draining their queues
17732248Sraf * or no worker's queue lock could be acquired.
17742248Sraf */
17752248Sraf if (!found) {
17762248Sraf if (_aio_worker_cnt < _max_workers) {
17772248Sraf if (_aio_create_worker(reqp, mode))
17782248Sraf aio_panic("_aio_req_add: add worker");
17792248Sraf sigon(self); /* reenable SIGIO */
17802248Sraf return;
17812248Sraf }
17822248Sraf
17832248Sraf /*
17842248Sraf * No worker available and we have created
17852248Sraf * _max_workers, keep going through the
17862248Sraf * list slowly until we get a lock
17872248Sraf */
17882248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
17892248Sraf /*
17902248Sraf * give someone else a chance
17912248Sraf */
17922248Sraf _aio_delay(1);
17932248Sraf aiowp = aiowp->work_forw;
17942248Sraf }
17952248Sraf }
17962248Sraf
17972248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
17982248Sraf if (_aio_worker_cnt < _max_workers &&
17992248Sraf aiowp->work_minload1 >= _minworkload) {
18002248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
18012248Sraf sig_mutex_lock(&__aio_mutex);
18022248Sraf *nextworker = aiowp->work_forw;
18032248Sraf sig_mutex_unlock(&__aio_mutex);
18042248Sraf if (_aio_create_worker(reqp, mode))
18052248Sraf aio_panic("aio_req_add: add worker");
18062248Sraf sigon(self); /* reenable SIGIO */
18072248Sraf return;
18082248Sraf }
18092248Sraf aiowp->work_minload1++;
18102248Sraf break;
18112248Sraf case AIOFSYNC:
18122248Sraf case AIONOTIFY:
18132248Sraf load_bal_flg = 0;
18142248Sraf sig_mutex_lock(&aiowp->work_qlock1);
18152248Sraf break;
18162248Sraf default:
18172248Sraf aio_panic("_aio_req_add: invalid mode");
18182248Sraf break;
18192248Sraf }
18202248Sraf /*
18212248Sraf * Put request onto worker's work queue.
18222248Sraf */
18232248Sraf if (aiowp->work_tail1 == NULL) {
18242248Sraf ASSERT(aiowp->work_count1 == 0);
18252248Sraf aiowp->work_tail1 = reqp;
18262248Sraf aiowp->work_next1 = reqp;
18272248Sraf } else {
18282248Sraf aiowp->work_head1->req_next = reqp;
18292248Sraf if (aiowp->work_next1 == NULL)
18302248Sraf aiowp->work_next1 = reqp;
18312248Sraf }
18322248Sraf reqp->req_state = AIO_REQ_QUEUED;
18332248Sraf reqp->req_worker = aiowp;
18342248Sraf aiowp->work_head1 = reqp;
18352248Sraf /*
18362248Sraf * Awaken worker if it is not currently active.
18372248Sraf */
18382248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
18392248Sraf aiowp->work_idleflg = 0;
18402248Sraf (void) cond_signal(&aiowp->work_idle_cv);
18412248Sraf }
18422248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
18432248Sraf
18442248Sraf if (load_bal_flg) {
18452248Sraf sig_mutex_lock(&__aio_mutex);
18462248Sraf *nextworker = aiowp->work_forw;
18472248Sraf sig_mutex_unlock(&__aio_mutex);
18482248Sraf }
18492248Sraf sigon(self); /* reenable SIGIO */
18502248Sraf }
18512248Sraf
18522248Sraf /*
18532248Sraf * Get an AIO request for a specified worker.
18542248Sraf * If the work queue is empty, return NULL.
18552248Sraf */
18562248Sraf aio_req_t *
_aio_req_get(aio_worker_t * aiowp)18572248Sraf _aio_req_get(aio_worker_t *aiowp)
18582248Sraf {
18592248Sraf aio_req_t *reqp;
18602248Sraf
18612248Sraf sig_mutex_lock(&aiowp->work_qlock1);
18622248Sraf if ((reqp = aiowp->work_next1) != NULL) {
18632248Sraf /*
18642248Sraf * Remove a POSIX request from the queue; the
18652248Sraf * request queue is a singularly linked list
18662248Sraf * with a previous pointer. The request is
18672248Sraf * removed by updating the previous pointer.
18682248Sraf *
18692248Sraf * Non-posix requests are left on the queue
18702248Sraf * to eventually be placed on the done queue.
18712248Sraf */
18722248Sraf
18732248Sraf if (POSIX_AIO(reqp)) {
18742248Sraf if (aiowp->work_prev1 == NULL) {
18752248Sraf aiowp->work_tail1 = reqp->req_next;
18762248Sraf if (aiowp->work_tail1 == NULL)
18772248Sraf aiowp->work_head1 = NULL;
18782248Sraf } else {
18792248Sraf aiowp->work_prev1->req_next = reqp->req_next;
18802248Sraf if (aiowp->work_head1 == reqp)
18812248Sraf aiowp->work_head1 = reqp->req_next;
18822248Sraf }
18832248Sraf
18842248Sraf } else {
18852248Sraf aiowp->work_prev1 = reqp;
18862248Sraf ASSERT(aiowp->work_done1 >= 0);
18872248Sraf aiowp->work_done1++;
18882248Sraf }
18892248Sraf ASSERT(reqp != reqp->req_next);
18902248Sraf aiowp->work_next1 = reqp->req_next;
18912248Sraf ASSERT(aiowp->work_count1 >= 1);
18922248Sraf aiowp->work_count1--;
18932248Sraf switch (reqp->req_op) {
18942248Sraf case AIOREAD:
18952248Sraf case AIOWRITE:
18962248Sraf case AIOAREAD:
18972248Sraf case AIOAWRITE:
18982248Sraf #if !defined(_LP64)
18992248Sraf case AIOAREAD64:
19002248Sraf case AIOAWRITE64:
19012248Sraf #endif
19022248Sraf ASSERT(aiowp->work_minload1 > 0);
19032248Sraf aiowp->work_minload1--;
19042248Sraf break;
19052248Sraf }
19062248Sraf reqp->req_state = AIO_REQ_INPROGRESS;
19072248Sraf }
19082248Sraf aiowp->work_req = reqp;
19092248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0);
19102248Sraf sig_mutex_unlock(&aiowp->work_qlock1);
19112248Sraf return (reqp);
19122248Sraf }
19132248Sraf
19142248Sraf static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)19152248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
19162248Sraf {
19172248Sraf aio_req_t **last;
19182248Sraf aio_req_t *lastrp;
19192248Sraf aio_req_t *next;
19202248Sraf
19212248Sraf ASSERT(aiowp != NULL);
19222248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
19232248Sraf if (POSIX_AIO(reqp)) {
19242248Sraf if (ostate != AIO_REQ_QUEUED)
19252248Sraf return;
19262248Sraf }
19272248Sraf last = &aiowp->work_tail1;
19282248Sraf lastrp = aiowp->work_tail1;
19292248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
19302248Sraf while ((next = *last) != NULL) {
19312248Sraf if (next == reqp) {
19322248Sraf *last = next->req_next;
19332248Sraf if (aiowp->work_next1 == next)
19342248Sraf aiowp->work_next1 = next->req_next;
19352248Sraf
1936*7025Spraks /*
1937*7025Spraks * if this is the first request on the queue, move
1938*7025Spraks * the lastrp pointer forward.
1939*7025Spraks */
1940*7025Spraks if (lastrp == next)
1941*7025Spraks lastrp = next->req_next;
1942*7025Spraks
1943*7025Spraks /*
1944*7025Spraks * if this request is pointed by work_head1, then
1945*7025Spraks * make work_head1 point to the last request that is
1946*7025Spraks * present on the queue.
1947*7025Spraks */
1948*7025Spraks if (aiowp->work_head1 == next)
1949*7025Spraks aiowp->work_head1 = lastrp;
1950*7025Spraks
1951*7025Spraks /*
1952*7025Spraks * work_prev1 is used only in non posix case and it
1953*7025Spraks * points to the current AIO_REQ_INPROGRESS request.
1954*7025Spraks * If work_prev1 points to this request which is being
1955*7025Spraks * deleted, make work_prev1 NULL and set work_done1
1956*7025Spraks * to 0.
1957*7025Spraks *
1958*7025Spraks * A worker thread can be processing only one request
1959*7025Spraks * at a time.
1960*7025Spraks */
1961*7025Spraks if (aiowp->work_prev1 == next) {
1962*7025Spraks ASSERT(ostate == AIO_REQ_INPROGRESS &&
1963*7025Spraks !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1964*7025Spraks aiowp->work_prev1 = NULL;
1965*7025Spraks aiowp->work_done1--;
19662248Sraf }
19672248Sraf
19682248Sraf if (ostate == AIO_REQ_QUEUED) {
19692248Sraf ASSERT(aiowp->work_count1 >= 1);
19702248Sraf aiowp->work_count1--;
19712248Sraf ASSERT(aiowp->work_minload1 >= 1);
19722248Sraf aiowp->work_minload1--;
19732248Sraf }
19742248Sraf return;
19752248Sraf }
19762248Sraf last = &next->req_next;
19772248Sraf lastrp = next;
19782248Sraf }
19792248Sraf /* NOTREACHED */
19802248Sraf }
19812248Sraf
19822248Sraf static void
_aio_enq_doneq(aio_req_t * reqp)19832248Sraf _aio_enq_doneq(aio_req_t *reqp)
19842248Sraf {
19852248Sraf if (_aio_doneq == NULL) {
19862248Sraf _aio_doneq = reqp;
19872248Sraf reqp->req_next = reqp->req_prev = reqp;
19882248Sraf } else {
19892248Sraf reqp->req_next = _aio_doneq;
19902248Sraf reqp->req_prev = _aio_doneq->req_prev;
19912248Sraf _aio_doneq->req_prev->req_next = reqp;
19922248Sraf _aio_doneq->req_prev = reqp;
19932248Sraf }
19942248Sraf reqp->req_state = AIO_REQ_DONEQ;
19952248Sraf _aio_doneq_cnt++;
19962248Sraf }
19972248Sraf
19982248Sraf /*
19992248Sraf * caller owns the _aio_mutex
20002248Sraf */
20012248Sraf aio_req_t *
_aio_req_remove(aio_req_t * reqp)20022248Sraf _aio_req_remove(aio_req_t *reqp)
20032248Sraf {
20042248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ)
20052248Sraf return (NULL);
20062248Sraf
20072248Sraf if (reqp) {
20082248Sraf /* request in done queue */
20092248Sraf if (_aio_doneq == reqp)
20102248Sraf _aio_doneq = reqp->req_next;
20112248Sraf if (_aio_doneq == reqp) {
20122248Sraf /* only one request on queue */
20132248Sraf _aio_doneq = NULL;
20142248Sraf } else {
20152248Sraf aio_req_t *tmp = reqp->req_next;
20162248Sraf reqp->req_prev->req_next = tmp;
20172248Sraf tmp->req_prev = reqp->req_prev;
20182248Sraf }
20192248Sraf } else if ((reqp = _aio_doneq) != NULL) {
20202248Sraf if (reqp == reqp->req_next) {
20212248Sraf /* only one request on queue */
20222248Sraf _aio_doneq = NULL;
20232248Sraf } else {
20242248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
20252248Sraf _aio_doneq->req_prev = reqp->req_prev;
20262248Sraf }
20272248Sraf }
20282248Sraf if (reqp) {
20292248Sraf _aio_doneq_cnt--;
20302248Sraf reqp->req_next = reqp->req_prev = reqp;
20312248Sraf reqp->req_state = AIO_REQ_DONE;
20322248Sraf }
20332248Sraf return (reqp);
20342248Sraf }
20352248Sraf
20362248Sraf /*
20372248Sraf * An AIO request is identified by an aio_result_t pointer. The library
20382248Sraf * maps this aio_result_t pointer to its internal representation using a
20392248Sraf * hash table. This function adds an aio_result_t pointer to the hash table.
20402248Sraf */
20412248Sraf static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)20422248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
20432248Sraf {
20442248Sraf aio_hash_t *hashp;
20452248Sraf aio_req_t **prev;
20462248Sraf aio_req_t *next;
20472248Sraf
20482248Sraf hashp = _aio_hash + AIOHASH(resultp);
20492248Sraf lmutex_lock(&hashp->hash_lock);
20502248Sraf prev = &hashp->hash_ptr;
20512248Sraf while ((next = *prev) != NULL) {
20522248Sraf if (resultp == next->req_resultp) {
20532248Sraf lmutex_unlock(&hashp->hash_lock);
20542248Sraf return (-1);
20552248Sraf }
20562248Sraf prev = &next->req_link;
20572248Sraf }
20582248Sraf *prev = reqp;
20592248Sraf ASSERT(reqp->req_link == NULL);
20602248Sraf lmutex_unlock(&hashp->hash_lock);
20612248Sraf return (0);
20622248Sraf }
20632248Sraf
20642248Sraf /*
20652248Sraf * Remove an entry from the hash table.
20662248Sraf */
20672248Sraf aio_req_t *
_aio_hash_del(aio_result_t * resultp)20682248Sraf _aio_hash_del(aio_result_t *resultp)
20692248Sraf {
20702248Sraf aio_hash_t *hashp;
20712248Sraf aio_req_t **prev;
20722248Sraf aio_req_t *next = NULL;
20732248Sraf
20742248Sraf if (_aio_hash != NULL) {
20752248Sraf hashp = _aio_hash + AIOHASH(resultp);
20762248Sraf lmutex_lock(&hashp->hash_lock);
20772248Sraf prev = &hashp->hash_ptr;
20782248Sraf while ((next = *prev) != NULL) {
20792248Sraf if (resultp == next->req_resultp) {
20802248Sraf *prev = next->req_link;
20812248Sraf next->req_link = NULL;
20822248Sraf break;
20832248Sraf }
20842248Sraf prev = &next->req_link;
20852248Sraf }
20862248Sraf lmutex_unlock(&hashp->hash_lock);
20872248Sraf }
20882248Sraf return (next);
20892248Sraf }
20902248Sraf
20912248Sraf /*
20922248Sraf * find an entry in the hash table
20932248Sraf */
20942248Sraf aio_req_t *
_aio_hash_find(aio_result_t * resultp)20952248Sraf _aio_hash_find(aio_result_t *resultp)
20962248Sraf {
20972248Sraf aio_hash_t *hashp;
20982248Sraf aio_req_t **prev;
20992248Sraf aio_req_t *next = NULL;
21002248Sraf
21012248Sraf if (_aio_hash != NULL) {
21022248Sraf hashp = _aio_hash + AIOHASH(resultp);
21032248Sraf lmutex_lock(&hashp->hash_lock);
21042248Sraf prev = &hashp->hash_ptr;
21052248Sraf while ((next = *prev) != NULL) {
21062248Sraf if (resultp == next->req_resultp)
21072248Sraf break;
21082248Sraf prev = &next->req_link;
21092248Sraf }
21102248Sraf lmutex_unlock(&hashp->hash_lock);
21112248Sraf }
21122248Sraf return (next);
21132248Sraf }
21142248Sraf
21152248Sraf /*
21162248Sraf * AIO interface for POSIX
21172248Sraf */
21182248Sraf int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)21192248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
21202248Sraf int mode, int flg)
21212248Sraf {
21222248Sraf aio_req_t *reqp;
21232248Sraf aio_args_t *ap;
21242248Sraf int kerr;
21252248Sraf
21262248Sraf if (aiocbp == NULL) {
21272248Sraf errno = EINVAL;
21282248Sraf return (-1);
21292248Sraf }
21302248Sraf
21312248Sraf /* initialize kaio */
21322248Sraf if (!_kaio_ok)
21332248Sraf _kaio_init();
21342248Sraf
21352248Sraf aiocbp->aio_state = NOCHECK;
21362248Sraf
21372248Sraf /*
21382248Sraf * If we have been called because a list I/O
21392248Sraf * kaio() failed, we dont want to repeat the
21402248Sraf * system call
21412248Sraf */
21422248Sraf
21432248Sraf if (flg & AIO_KAIO) {
21442248Sraf /*
21452248Sraf * Try kernel aio first.
21462248Sraf * If errno is ENOTSUP/EBADFD,
21472248Sraf * fall back to the thread implementation.
21482248Sraf */
21492248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
21502248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS;
21512248Sraf aiocbp->aio_state = CHECK;
21522248Sraf kerr = (int)_kaio(mode, aiocbp);
21532248Sraf if (kerr == 0)
21542248Sraf return (0);
21552248Sraf if (errno != ENOTSUP && errno != EBADFD) {
21562248Sraf aiocbp->aio_resultp.aio_errno = errno;
21572248Sraf aiocbp->aio_resultp.aio_return = -1;
21582248Sraf aiocbp->aio_state = NOCHECK;
21592248Sraf return (-1);
21602248Sraf }
21612248Sraf if (errno == EBADFD)
21622248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
21632248Sraf }
21642248Sraf }
21652248Sraf
21662248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS;
21672248Sraf aiocbp->aio_state = USERAIO;
21682248Sraf
21692248Sraf if (!__uaio_ok && __uaio_init() == -1)
21702248Sraf return (-1);
21712248Sraf
21722248Sraf if ((reqp = _aio_req_alloc()) == NULL) {
21732248Sraf errno = EAGAIN;
21742248Sraf return (-1);
21752248Sraf }
21762248Sraf
21772248Sraf /*
21782248Sraf * If an LIO request, add the list head to the aio request
21792248Sraf */
21802248Sraf reqp->req_head = lio_head;
21812248Sraf reqp->req_type = AIO_POSIX_REQ;
21822248Sraf reqp->req_op = mode;
21832248Sraf reqp->req_largefile = 0;
21842248Sraf
21852248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
21862248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE;
21872248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
21882248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
21892248Sraf reqp->req_sigevent.sigev_signo =
21902248Sraf aiocbp->aio_sigevent.sigev_signo;
21912248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
21922248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr;
21932248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
21942248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
21952248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT;
21962248Sraf /*
21972248Sraf * Reuse the sigevent structure to contain the port number
21982248Sraf * and the user value. Same for SIGEV_THREAD, below.
21992248Sraf */
22002248Sraf reqp->req_sigevent.sigev_signo =
22012248Sraf pn->portnfy_port;
22022248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
22032248Sraf pn->portnfy_user;
22042248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
22052248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
22062248Sraf /*
22072248Sraf * The sigevent structure contains the port number
22082248Sraf * and the user value. Same for SIGEV_PORT, above.
22092248Sraf */
22102248Sraf reqp->req_sigevent.sigev_signo =
22112248Sraf aiocbp->aio_sigevent.sigev_signo;
22122248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
22132248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr;
22142248Sraf }
22152248Sraf
22162248Sraf reqp->req_resultp = &aiocbp->aio_resultp;
22172248Sraf reqp->req_aiocbp = aiocbp;
22182248Sraf ap = &reqp->req_args;
22192248Sraf ap->fd = aiocbp->aio_fildes;
22202248Sraf ap->buf = (caddr_t)aiocbp->aio_buf;
22212248Sraf ap->bufsz = aiocbp->aio_nbytes;
22222248Sraf ap->offset = aiocbp->aio_offset;
22232248Sraf
22242248Sraf if ((flg & AIO_NO_DUPS) &&
22252248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
22262248Sraf aio_panic("_aio_rw(): request already in hash table");
22272248Sraf _aio_req_free(reqp);
22282248Sraf errno = EINVAL;
22292248Sraf return (-1);
22302248Sraf }
22312248Sraf _aio_req_add(reqp, nextworker, mode);
22322248Sraf return (0);
22332248Sraf }
22342248Sraf
22352248Sraf #if !defined(_LP64)
22362248Sraf /*
22372248Sraf * 64-bit AIO interface for POSIX
22382248Sraf */
22392248Sraf int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)22402248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
22412248Sraf int mode, int flg)
22422248Sraf {
22432248Sraf aio_req_t *reqp;
22442248Sraf aio_args_t *ap;
22452248Sraf int kerr;
22462248Sraf
22472248Sraf if (aiocbp == NULL) {
22482248Sraf errno = EINVAL;
22492248Sraf return (-1);
22502248Sraf }
22512248Sraf
22522248Sraf /* initialize kaio */
22532248Sraf if (!_kaio_ok)
22542248Sraf _kaio_init();
22552248Sraf
22562248Sraf aiocbp->aio_state = NOCHECK;
22572248Sraf
22582248Sraf /*
22592248Sraf * If we have been called because a list I/O
22602248Sraf * kaio() failed, we dont want to repeat the
22612248Sraf * system call
22622248Sraf */
22632248Sraf
22642248Sraf if (flg & AIO_KAIO) {
22652248Sraf /*
22662248Sraf * Try kernel aio first.
22672248Sraf * If errno is ENOTSUP/EBADFD,
22682248Sraf * fall back to the thread implementation.
22692248Sraf */
22702248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
22712248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS;
22722248Sraf aiocbp->aio_state = CHECK;
22732248Sraf kerr = (int)_kaio(mode, aiocbp);
22742248Sraf if (kerr == 0)
22752248Sraf return (0);
22762248Sraf if (errno != ENOTSUP && errno != EBADFD) {
22772248Sraf aiocbp->aio_resultp.aio_errno = errno;
22782248Sraf aiocbp->aio_resultp.aio_return = -1;
22792248Sraf aiocbp->aio_state = NOCHECK;
22802248Sraf return (-1);
22812248Sraf }
22822248Sraf if (errno == EBADFD)
22832248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
22842248Sraf }
22852248Sraf }
22862248Sraf
22872248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS;
22882248Sraf aiocbp->aio_state = USERAIO;
22892248Sraf
22902248Sraf if (!__uaio_ok && __uaio_init() == -1)
22912248Sraf return (-1);
22922248Sraf
22932248Sraf if ((reqp = _aio_req_alloc()) == NULL) {
22942248Sraf errno = EAGAIN;
22952248Sraf return (-1);
22962248Sraf }
22972248Sraf
22982248Sraf /*
22992248Sraf * If an LIO request, add the list head to the aio request
23002248Sraf */
23012248Sraf reqp->req_head = lio_head;
23022248Sraf reqp->req_type = AIO_POSIX_REQ;
23032248Sraf reqp->req_op = mode;
23042248Sraf reqp->req_largefile = 1;
23052248Sraf
23062248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
23072248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE;
23082248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
23092248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
23102248Sraf reqp->req_sigevent.sigev_signo =
23112248Sraf aiocbp->aio_sigevent.sigev_signo;
23122248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
23132248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr;
23142248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
23152248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
23162248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT;
23172248Sraf reqp->req_sigevent.sigev_signo =
23182248Sraf pn->portnfy_port;
23192248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
23202248Sraf pn->portnfy_user;
23212248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
23222248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
23232248Sraf reqp->req_sigevent.sigev_signo =
23242248Sraf aiocbp->aio_sigevent.sigev_signo;
23252248Sraf reqp->req_sigevent.sigev_value.sival_ptr =
23262248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr;
23272248Sraf }
23282248Sraf
23292248Sraf reqp->req_resultp = &aiocbp->aio_resultp;
23302248Sraf reqp->req_aiocbp = aiocbp;
23312248Sraf ap = &reqp->req_args;
23322248Sraf ap->fd = aiocbp->aio_fildes;
23332248Sraf ap->buf = (caddr_t)aiocbp->aio_buf;
23342248Sraf ap->bufsz = aiocbp->aio_nbytes;
23352248Sraf ap->offset = aiocbp->aio_offset;
23362248Sraf
23372248Sraf if ((flg & AIO_NO_DUPS) &&
23382248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
23392248Sraf aio_panic("_aio_rw64(): request already in hash table");
23402248Sraf _aio_req_free(reqp);
23412248Sraf errno = EINVAL;
23422248Sraf return (-1);
23432248Sraf }
23442248Sraf _aio_req_add(reqp, nextworker, mode);
23452248Sraf return (0);
23462248Sraf }
23472248Sraf #endif /* !defined(_LP64) */
2348