xref: /onnv-gate/usr/src/lib/libc/port/aio/aio.c (revision 6812:febeba71273d)
12248Sraf /*
22248Sraf  * CDDL HEADER START
32248Sraf  *
42248Sraf  * The contents of this file are subject to the terms of the
52248Sraf  * Common Development and Distribution License (the "License").
62248Sraf  * You may not use this file except in compliance with the License.
72248Sraf  *
82248Sraf  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
92248Sraf  * or http://www.opensolaris.org/os/licensing.
102248Sraf  * See the License for the specific language governing permissions
112248Sraf  * and limitations under the License.
122248Sraf  *
132248Sraf  * When distributing Covered Code, include this CDDL HEADER in each
142248Sraf  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
152248Sraf  * If applicable, add the following below this CDDL HEADER, with the
162248Sraf  * fields enclosed by brackets "[]" replaced with your own identifying
172248Sraf  * information: Portions Copyright [yyyy] [name of copyright owner]
182248Sraf  *
192248Sraf  * CDDL HEADER END
202248Sraf  */
212248Sraf 
222248Sraf /*
235891Sraf  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
242248Sraf  * Use is subject to license terms.
252248Sraf  */
262248Sraf 
272248Sraf #pragma ident	"%Z%%M%	%I%	%E% SMI"
282248Sraf 
29*6812Sraf #include "lint.h"
302248Sraf #include "thr_uberdata.h"
312248Sraf #include "asyncio.h"
322248Sraf #include <atomic.h>
332248Sraf #include <sys/param.h>
342248Sraf #include <sys/file.h>
352248Sraf #include <sys/port.h>
362248Sraf 
372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *);
382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *);
392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
412248Sraf static void _aio_work_done(aio_worker_t *);
422248Sraf static void _aio_enq_doneq(aio_req_t *);
432248Sraf 
442248Sraf extern void _aio_lio_free(aio_lio_t *);
452248Sraf 
462248Sraf extern int __fdsync(int, int);
475937Sraf extern int __fcntl(int, int, ...);
482248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
492248Sraf 
502248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
512248Sraf static void _aiodone(aio_req_t *, ssize_t, int);
522248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
532248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int);
542248Sraf 
552248Sraf /*
562248Sraf  * switch for kernel async I/O
572248Sraf  */
582248Sraf int _kaio_ok = 0;		/* 0 = disabled, 1 = on, -1 = error */
592248Sraf 
602248Sraf /*
612248Sraf  * Key for thread-specific data
622248Sraf  */
632248Sraf pthread_key_t _aio_key;
642248Sraf 
652248Sraf /*
662248Sraf  * Array for determining whether or not a file supports kaio.
672248Sraf  * Initialized in _kaio_init().
682248Sraf  */
692248Sraf uint32_t *_kaio_supported = NULL;
702248Sraf 
712248Sraf /*
722248Sraf  *  workers for read/write requests
732248Sraf  * (__aio_mutex lock protects circular linked list of workers)
742248Sraf  */
752248Sraf aio_worker_t *__workers_rw;	/* circular list of AIO workers */
762248Sraf aio_worker_t *__nextworker_rw;	/* next worker in list of workers */
772248Sraf int __rw_workerscnt;		/* number of read/write workers */
782248Sraf 
792248Sraf /*
802248Sraf  * worker for notification requests.
812248Sraf  */
822248Sraf aio_worker_t *__workers_no;	/* circular list of AIO workers */
832248Sraf aio_worker_t *__nextworker_no;	/* next worker in list of workers */
842248Sraf int __no_workerscnt;		/* number of write workers */
852248Sraf 
862248Sraf aio_req_t *_aio_done_tail;		/* list of done requests */
872248Sraf aio_req_t *_aio_done_head;
882248Sraf 
892248Sraf mutex_t __aio_initlock = DEFAULTMUTEX;	/* makes aio initialization atomic */
902248Sraf cond_t __aio_initcv = DEFAULTCV;
912248Sraf int __aio_initbusy = 0;
922248Sraf 
932248Sraf mutex_t __aio_mutex = DEFAULTMUTEX;	/* protects counts, and linked lists */
942248Sraf cond_t _aio_iowait_cv = DEFAULTCV;	/* wait for userland I/Os */
952248Sraf 
962248Sraf pid_t __pid = (pid_t)-1;		/* initialize as invalid pid */
972248Sraf int _sigio_enabled = 0;			/* when set, send SIGIO signal */
982248Sraf 
992248Sraf aio_hash_t *_aio_hash;
1002248Sraf 
1012248Sraf aio_req_t *_aio_doneq;			/* double linked done queue list */
1022248Sraf 
1032248Sraf int _aio_donecnt = 0;
1042248Sraf int _aio_waitncnt = 0;			/* # of requests for aio_waitn */
1052248Sraf int _aio_doneq_cnt = 0;
1062248Sraf int _aio_outstand_cnt = 0;		/* # of outstanding requests */
1072248Sraf int _kaio_outstand_cnt = 0;		/* # of outstanding kaio requests */
1082248Sraf int _aio_req_done_cnt = 0;		/* req. done but not in "done queue" */
1092248Sraf int _aio_kernel_suspend = 0;		/* active kernel kaio calls */
1102248Sraf int _aio_suscv_cnt = 0;			/* aio_suspend calls waiting on cv's */
1112248Sraf 
1122248Sraf int _max_workers = 256;			/* max number of workers permitted */
1132248Sraf int _min_workers = 4;			/* min number of workers */
1142248Sraf int _minworkload = 2;			/* min number of request in q */
1152248Sraf int _aio_worker_cnt = 0;		/* number of workers to do requests */
1162248Sraf int __uaio_ok = 0;			/* AIO has been enabled */
1172248Sraf sigset_t _worker_set;			/* worker's signal mask */
1182248Sraf 
1192248Sraf int _aiowait_flag = 0;			/* when set, aiowait() is inprogress */
1202248Sraf int _aio_flags = 0;			/* see asyncio.h defines for */
1212248Sraf 
1222248Sraf aio_worker_t *_kaiowp = NULL;		/* points to kaio cleanup thread */
1232248Sraf 
1242248Sraf int hz;					/* clock ticks per second */
1252248Sraf 
1262248Sraf static int
1272248Sraf _kaio_supported_init(void)
1282248Sraf {
1292248Sraf 	void *ptr;
1302248Sraf 	size_t size;
1312248Sraf 
1322248Sraf 	if (_kaio_supported != NULL)	/* already initialized */
1332248Sraf 		return (0);
1342248Sraf 
1352248Sraf 	size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
1362248Sraf 	ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
1372248Sraf 	    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
1382248Sraf 	if (ptr == MAP_FAILED)
1392248Sraf 		return (-1);
1402248Sraf 	_kaio_supported = ptr;
1412248Sraf 	return (0);
1422248Sraf }
1432248Sraf 
1442248Sraf /*
1452248Sraf  * The aio subsystem is initialized when an AIO request is made.
1462248Sraf  * Constants are initialized like the max number of workers that
1472248Sraf  * the subsystem can create, and the minimum number of workers
1482248Sraf  * permitted before imposing some restrictions.  Also, some
1492248Sraf  * workers are created.
1502248Sraf  */
1512248Sraf int
1522248Sraf __uaio_init(void)
1532248Sraf {
1542248Sraf 	int ret = -1;
1552248Sraf 	int i;
1565891Sraf 	int cancel_state;
1572248Sraf 
1582248Sraf 	lmutex_lock(&__aio_initlock);
1595891Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
1602248Sraf 	while (__aio_initbusy)
1615891Sraf 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
1625891Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
1632248Sraf 	if (__uaio_ok) {	/* already initialized */
1642248Sraf 		lmutex_unlock(&__aio_initlock);
1652248Sraf 		return (0);
1662248Sraf 	}
1672248Sraf 	__aio_initbusy = 1;
1682248Sraf 	lmutex_unlock(&__aio_initlock);
1692248Sraf 
1702248Sraf 	hz = (int)sysconf(_SC_CLK_TCK);
1712248Sraf 	__pid = getpid();
1722248Sraf 
1732248Sraf 	setup_cancelsig(SIGAIOCANCEL);
1742248Sraf 
1752248Sraf 	if (_kaio_supported_init() != 0)
1762248Sraf 		goto out;
1772248Sraf 
1782248Sraf 	/*
1792248Sraf 	 * Allocate and initialize the hash table.
1803344Ssp92102 	 * Do this only once, even if __uaio_init() is called twice.
1812248Sraf 	 */
1823344Ssp92102 	if (_aio_hash == NULL) {
1833344Ssp92102 		/* LINTED pointer cast */
1843344Ssp92102 		_aio_hash = (aio_hash_t *)mmap(NULL,
1853344Ssp92102 		    HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
1863344Ssp92102 		    MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
1873344Ssp92102 		if ((void *)_aio_hash == MAP_FAILED) {
1883344Ssp92102 			_aio_hash = NULL;
1893344Ssp92102 			goto out;
1903344Ssp92102 		}
1913344Ssp92102 		for (i = 0; i < HASHSZ; i++)
1923344Ssp92102 			(void) mutex_init(&_aio_hash[i].hash_lock,
1933344Ssp92102 			    USYNC_THREAD, NULL);
1942248Sraf 	}
1952248Sraf 
1962248Sraf 	/*
1972248Sraf 	 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
1982248Sraf 	 */
1992248Sraf 	(void) sigfillset(&_worker_set);
2002248Sraf 	(void) sigdelset(&_worker_set, SIGAIOCANCEL);
2012248Sraf 
2022248Sraf 	/*
2033344Ssp92102 	 * Create one worker to send asynchronous notifications.
2043344Ssp92102 	 * Do this only once, even if __uaio_init() is called twice.
2053344Ssp92102 	 */
2063344Ssp92102 	if (__no_workerscnt == 0 &&
2073344Ssp92102 	    (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
2083344Ssp92102 		errno = EAGAIN;
2093344Ssp92102 		goto out;
2103344Ssp92102 	}
2113344Ssp92102 
2123344Ssp92102 	/*
2132248Sraf 	 * Create the minimum number of read/write workers.
2143344Ssp92102 	 * And later check whether atleast one worker is created;
2153344Ssp92102 	 * lwp_create() calls could fail because of segkp exhaustion.
2162248Sraf 	 */
2172248Sraf 	for (i = 0; i < _min_workers; i++)
2182248Sraf 		(void) _aio_create_worker(NULL, AIOREAD);
2193344Ssp92102 	if (__rw_workerscnt == 0) {
2203344Ssp92102 		errno = EAGAIN;
2213344Ssp92102 		goto out;
2223344Ssp92102 	}
2232248Sraf 
2242248Sraf 	ret = 0;
2252248Sraf out:
2262248Sraf 	lmutex_lock(&__aio_initlock);
2272248Sraf 	if (ret == 0)
2282248Sraf 		__uaio_ok = 1;
2292248Sraf 	__aio_initbusy = 0;
2302248Sraf 	(void) cond_broadcast(&__aio_initcv);
2312248Sraf 	lmutex_unlock(&__aio_initlock);
2322248Sraf 	return (ret);
2332248Sraf }
2342248Sraf 
2352248Sraf /*
2362248Sraf  * Called from close() before actually performing the real _close().
2372248Sraf  */
2382248Sraf void
2392248Sraf _aio_close(int fd)
2402248Sraf {
2412248Sraf 	if (fd < 0)	/* avoid cancelling everything */
2422248Sraf 		return;
2432248Sraf 	/*
2442248Sraf 	 * Cancel all outstanding aio requests for this file descriptor.
2452248Sraf 	 */
2462248Sraf 	if (__uaio_ok)
2472248Sraf 		(void) aiocancel_all(fd);
2482248Sraf 	/*
2492248Sraf 	 * If we have allocated the bit array, clear the bit for this file.
2502248Sraf 	 * The next open may re-use this file descriptor and the new file
2512248Sraf 	 * may have different kaio() behaviour.
2522248Sraf 	 */
2532248Sraf 	if (_kaio_supported != NULL)
2542248Sraf 		CLEAR_KAIO_SUPPORTED(fd);
2552248Sraf }
2562248Sraf 
2572248Sraf /*
2582248Sraf  * special kaio cleanup thread sits in a loop in the
2592248Sraf  * kernel waiting for pending kaio requests to complete.
2602248Sraf  */
2612248Sraf void *
2622248Sraf _kaio_cleanup_thread(void *arg)
2632248Sraf {
2642248Sraf 	if (pthread_setspecific(_aio_key, arg) != 0)
2652248Sraf 		aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
2662248Sraf 	(void) _kaio(AIOSTART);
2672248Sraf 	return (arg);
2682248Sraf }
2692248Sraf 
2702248Sraf /*
2712248Sraf  * initialize kaio.
2722248Sraf  */
2732248Sraf void
2742248Sraf _kaio_init()
2752248Sraf {
2762248Sraf 	int error;
2772248Sraf 	sigset_t oset;
2785891Sraf 	int cancel_state;
2792248Sraf 
2802248Sraf 	lmutex_lock(&__aio_initlock);
2815891Sraf 	(void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
2822248Sraf 	while (__aio_initbusy)
2835891Sraf 		(void) cond_wait(&__aio_initcv, &__aio_initlock);
2845891Sraf 	(void) pthread_setcancelstate(cancel_state, NULL);
2852248Sraf 	if (_kaio_ok) {		/* already initialized */
2862248Sraf 		lmutex_unlock(&__aio_initlock);
2872248Sraf 		return;
2882248Sraf 	}
2892248Sraf 	__aio_initbusy = 1;
2902248Sraf 	lmutex_unlock(&__aio_initlock);
2912248Sraf 
2922248Sraf 	if (_kaio_supported_init() != 0)
2932248Sraf 		error = ENOMEM;
2942248Sraf 	else if ((_kaiowp = _aio_worker_alloc()) == NULL)
2952248Sraf 		error = ENOMEM;
2962248Sraf 	else if ((error = (int)_kaio(AIOINIT)) == 0) {
2972248Sraf 		(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
2982248Sraf 		error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
2992248Sraf 		    _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
3002248Sraf 		(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
3012248Sraf 	}
3022248Sraf 	if (error && _kaiowp != NULL) {
3032248Sraf 		_aio_worker_free(_kaiowp);
3042248Sraf 		_kaiowp = NULL;
3052248Sraf 	}
3062248Sraf 
3072248Sraf 	lmutex_lock(&__aio_initlock);
3082248Sraf 	if (error)
3092248Sraf 		_kaio_ok = -1;
3102248Sraf 	else
3112248Sraf 		_kaio_ok = 1;
3122248Sraf 	__aio_initbusy = 0;
3132248Sraf 	(void) cond_broadcast(&__aio_initcv);
3142248Sraf 	lmutex_unlock(&__aio_initlock);
3152248Sraf }
3162248Sraf 
3172248Sraf int
3182248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
3192248Sraf     aio_result_t *resultp)
3202248Sraf {
3212248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
3222248Sraf }
3232248Sraf 
3242248Sraf int
3252248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
3262248Sraf     aio_result_t *resultp)
3272248Sraf {
3282248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
3292248Sraf }
3302248Sraf 
3312248Sraf #if !defined(_LP64)
3322248Sraf int
3332248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
3342248Sraf     aio_result_t *resultp)
3352248Sraf {
3362248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
3372248Sraf }
3382248Sraf 
3392248Sraf int
3402248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
3412248Sraf     aio_result_t *resultp)
3422248Sraf {
3432248Sraf 	return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
3442248Sraf }
3452248Sraf #endif	/* !defined(_LP64) */
3462248Sraf 
3472248Sraf int
3482248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
3492248Sraf     aio_result_t *resultp, int mode)
3502248Sraf {
3512248Sraf 	aio_req_t *reqp;
3522248Sraf 	aio_args_t *ap;
3532248Sraf 	offset_t loffset;
3545535Spraks 	struct stat64 stat64;
3552248Sraf 	int error = 0;
3562248Sraf 	int kerr;
3572248Sraf 	int umode;
3582248Sraf 
3592248Sraf 	switch (whence) {
3602248Sraf 
3612248Sraf 	case SEEK_SET:
3622248Sraf 		loffset = offset;
3632248Sraf 		break;
3642248Sraf 	case SEEK_CUR:
3652248Sraf 		if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
3662248Sraf 			error = -1;
3672248Sraf 		else
3682248Sraf 			loffset += offset;
3692248Sraf 		break;
3702248Sraf 	case SEEK_END:
3715535Spraks 		if (fstat64(fd, &stat64) == -1)
3722248Sraf 			error = -1;
3732248Sraf 		else
3745535Spraks 			loffset = offset + stat64.st_size;
3752248Sraf 		break;
3762248Sraf 	default:
3772248Sraf 		errno = EINVAL;
3782248Sraf 		error = -1;
3792248Sraf 	}
3802248Sraf 
3812248Sraf 	if (error)
3822248Sraf 		return (error);
3832248Sraf 
3842248Sraf 	/* initialize kaio */
3852248Sraf 	if (!_kaio_ok)
3862248Sraf 		_kaio_init();
3872248Sraf 
3882248Sraf 	/*
3892248Sraf 	 * _aio_do_request() needs the original request code (mode) to be able
3902248Sraf 	 * to choose the appropiate 32/64 bit function.  All other functions
3912248Sraf 	 * only require the difference between READ and WRITE (umode).
3922248Sraf 	 */
3932248Sraf 	if (mode == AIOAREAD64 || mode == AIOAWRITE64)
3942248Sraf 		umode = mode - AIOAREAD64;
3952248Sraf 	else
3962248Sraf 		umode = mode;
3972248Sraf 
3982248Sraf 	/*
3992248Sraf 	 * Try kernel aio first.
4002248Sraf 	 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
4012248Sraf 	 */
4022248Sraf 	if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
4032248Sraf 		resultp->aio_errno = 0;
4042248Sraf 		sig_mutex_lock(&__aio_mutex);
4052248Sraf 		_kaio_outstand_cnt++;
4065535Spraks 		sig_mutex_unlock(&__aio_mutex);
4072248Sraf 		kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
4082248Sraf 		    (umode | AIO_POLL_BIT) : umode),
4092248Sraf 		    fd, buf, bufsz, loffset, resultp);
4102248Sraf 		if (kerr == 0) {
4112248Sraf 			return (0);
4122248Sraf 		}
4135535Spraks 		sig_mutex_lock(&__aio_mutex);
4142248Sraf 		_kaio_outstand_cnt--;
4152248Sraf 		sig_mutex_unlock(&__aio_mutex);
4162248Sraf 		if (errno != ENOTSUP && errno != EBADFD)
4172248Sraf 			return (-1);
4182248Sraf 		if (errno == EBADFD)
4192248Sraf 			SET_KAIO_NOT_SUPPORTED(fd);
4202248Sraf 	}
4212248Sraf 
4222248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
4232248Sraf 		return (-1);
4242248Sraf 
4252248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
4262248Sraf 		errno = EAGAIN;
4272248Sraf 		return (-1);
4282248Sraf 	}
4292248Sraf 
4302248Sraf 	/*
4312248Sraf 	 * _aio_do_request() checks reqp->req_op to differentiate
4322248Sraf 	 * between 32 and 64 bit access.
4332248Sraf 	 */
4342248Sraf 	reqp->req_op = mode;
4352248Sraf 	reqp->req_resultp = resultp;
4362248Sraf 	ap = &reqp->req_args;
4372248Sraf 	ap->fd = fd;
4382248Sraf 	ap->buf = buf;
4392248Sraf 	ap->bufsz = bufsz;
4402248Sraf 	ap->offset = loffset;
4412248Sraf 
4422248Sraf 	if (_aio_hash_insert(resultp, reqp) != 0) {
4432248Sraf 		_aio_req_free(reqp);
4442248Sraf 		errno = EINVAL;
4452248Sraf 		return (-1);
4462248Sraf 	}
4472248Sraf 	/*
4482248Sraf 	 * _aio_req_add() only needs the difference between READ and
4492248Sraf 	 * WRITE to choose the right worker queue.
4502248Sraf 	 */
4512248Sraf 	_aio_req_add(reqp, &__nextworker_rw, umode);
4522248Sraf 	return (0);
4532248Sraf }
4542248Sraf 
4552248Sraf int
4562248Sraf aiocancel(aio_result_t *resultp)
4572248Sraf {
4582248Sraf 	aio_req_t *reqp;
4592248Sraf 	aio_worker_t *aiowp;
4602248Sraf 	int ret;
4612248Sraf 	int done = 0;
4622248Sraf 	int canceled = 0;
4632248Sraf 
4642248Sraf 	if (!__uaio_ok) {
4652248Sraf 		errno = EINVAL;
4662248Sraf 		return (-1);
4672248Sraf 	}
4682248Sraf 
4692248Sraf 	sig_mutex_lock(&__aio_mutex);
4702248Sraf 	reqp = _aio_hash_find(resultp);
4712248Sraf 	if (reqp == NULL) {
4722248Sraf 		if (_aio_outstand_cnt == _aio_req_done_cnt)
4732248Sraf 			errno = EINVAL;
4742248Sraf 		else
4752248Sraf 			errno = EACCES;
4762248Sraf 		ret = -1;
4772248Sraf 	} else {
4782248Sraf 		aiowp = reqp->req_worker;
4792248Sraf 		sig_mutex_lock(&aiowp->work_qlock1);
4802248Sraf 		(void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
4812248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
4822248Sraf 
4832248Sraf 		if (canceled) {
4842248Sraf 			ret = 0;
4852248Sraf 		} else {
4862248Sraf 			if (_aio_outstand_cnt == 0 ||
4872248Sraf 			    _aio_outstand_cnt == _aio_req_done_cnt)
4882248Sraf 				errno = EINVAL;
4892248Sraf 			else
4902248Sraf 				errno = EACCES;
4912248Sraf 			ret = -1;
4922248Sraf 		}
4932248Sraf 	}
4942248Sraf 	sig_mutex_unlock(&__aio_mutex);
4952248Sraf 	return (ret);
4962248Sraf }
4972248Sraf 
4985891Sraf /* ARGSUSED */
4995891Sraf static void
5005891Sraf _aiowait_cleanup(void *arg)
5015891Sraf {
5025891Sraf 	sig_mutex_lock(&__aio_mutex);
5035891Sraf 	_aiowait_flag--;
5045891Sraf 	sig_mutex_unlock(&__aio_mutex);
5055891Sraf }
5065891Sraf 
5072248Sraf /*
5085891Sraf  * This must be asynch safe and cancel safe
5092248Sraf  */
5102248Sraf aio_result_t *
5112248Sraf aiowait(struct timeval *uwait)
5122248Sraf {
5132248Sraf 	aio_result_t *uresultp;
5142248Sraf 	aio_result_t *kresultp;
5152248Sraf 	aio_result_t *resultp;
5162248Sraf 	int dontblock;
5172248Sraf 	int timedwait = 0;
5182248Sraf 	int kaio_errno = 0;
5192248Sraf 	struct timeval twait;
5202248Sraf 	struct timeval *wait = NULL;
5212248Sraf 	hrtime_t hrtend;
5222248Sraf 	hrtime_t hres;
5232248Sraf 
5242248Sraf 	if (uwait) {
5252248Sraf 		/*
5262248Sraf 		 * Check for a valid specified wait time.
5272248Sraf 		 * If it is invalid, fail the call right away.
5282248Sraf 		 */
5292248Sraf 		if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
5302248Sraf 		    uwait->tv_usec >= MICROSEC) {
5312248Sraf 			errno = EINVAL;
5322248Sraf 			return ((aio_result_t *)-1);
5332248Sraf 		}
5342248Sraf 
5352248Sraf 		if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
5362248Sraf 			hrtend = gethrtime() +
5374502Spraks 			    (hrtime_t)uwait->tv_sec * NANOSEC +
5384502Spraks 			    (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
5392248Sraf 			twait = *uwait;
5402248Sraf 			wait = &twait;
5412248Sraf 			timedwait++;
5422248Sraf 		} else {
5432248Sraf 			/* polling */
5442248Sraf 			sig_mutex_lock(&__aio_mutex);
5452248Sraf 			if (_kaio_outstand_cnt == 0) {
5462248Sraf 				kresultp = (aio_result_t *)-1;
5472248Sraf 			} else {
5482248Sraf 				kresultp = (aio_result_t *)_kaio(AIOWAIT,
5492248Sraf 				    (struct timeval *)-1, 1);
5502248Sraf 				if (kresultp != (aio_result_t *)-1 &&
5512248Sraf 				    kresultp != NULL &&
5522248Sraf 				    kresultp != (aio_result_t *)1) {
5532248Sraf 					_kaio_outstand_cnt--;
5542248Sraf 					sig_mutex_unlock(&__aio_mutex);
5552248Sraf 					return (kresultp);
5562248Sraf 				}
5572248Sraf 			}
5582248Sraf 			uresultp = _aio_req_done();
5592248Sraf 			sig_mutex_unlock(&__aio_mutex);
5602248Sraf 			if (uresultp != NULL &&
5612248Sraf 			    uresultp != (aio_result_t *)-1) {
5622248Sraf 				return (uresultp);
5632248Sraf 			}
5642248Sraf 			if (uresultp == (aio_result_t *)-1 &&
5652248Sraf 			    kresultp == (aio_result_t *)-1) {
5662248Sraf 				errno = EINVAL;
5672248Sraf 				return ((aio_result_t *)-1);
5682248Sraf 			} else {
5692248Sraf 				return (NULL);
5702248Sraf 			}
5712248Sraf 		}
5722248Sraf 	}
5732248Sraf 
5742248Sraf 	for (;;) {
5752248Sraf 		sig_mutex_lock(&__aio_mutex);
5762248Sraf 		uresultp = _aio_req_done();
5772248Sraf 		if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
5782248Sraf 			sig_mutex_unlock(&__aio_mutex);
5792248Sraf 			resultp = uresultp;
5802248Sraf 			break;
5812248Sraf 		}
5822248Sraf 		_aiowait_flag++;
5832248Sraf 		dontblock = (uresultp == (aio_result_t *)-1);
5842248Sraf 		if (dontblock && _kaio_outstand_cnt == 0) {
5852248Sraf 			kresultp = (aio_result_t *)-1;
5862248Sraf 			kaio_errno = EINVAL;
5872248Sraf 		} else {
5882248Sraf 			sig_mutex_unlock(&__aio_mutex);
5895891Sraf 			pthread_cleanup_push(_aiowait_cleanup, NULL);
5905891Sraf 			_cancel_prologue();
5912248Sraf 			kresultp = (aio_result_t *)_kaio(AIOWAIT,
5922248Sraf 			    wait, dontblock);
5935891Sraf 			_cancel_epilogue();
5945891Sraf 			pthread_cleanup_pop(0);
5952248Sraf 			sig_mutex_lock(&__aio_mutex);
5962248Sraf 			kaio_errno = errno;
5972248Sraf 		}
5982248Sraf 		_aiowait_flag--;
5992248Sraf 		sig_mutex_unlock(&__aio_mutex);
6002248Sraf 		if (kresultp == (aio_result_t *)1) {
6012248Sraf 			/* aiowait() awakened by an aionotify() */
6022248Sraf 			continue;
6032248Sraf 		} else if (kresultp != NULL &&
6042248Sraf 		    kresultp != (aio_result_t *)-1) {
6052248Sraf 			resultp = kresultp;
6062248Sraf 			sig_mutex_lock(&__aio_mutex);
6072248Sraf 			_kaio_outstand_cnt--;
6082248Sraf 			sig_mutex_unlock(&__aio_mutex);
6092248Sraf 			break;
6102248Sraf 		} else if (kresultp == (aio_result_t *)-1 &&
6112248Sraf 		    kaio_errno == EINVAL &&
6122248Sraf 		    uresultp == (aio_result_t *)-1) {
6132248Sraf 			errno = kaio_errno;
6142248Sraf 			resultp = (aio_result_t *)-1;
6152248Sraf 			break;
6162248Sraf 		} else if (kresultp == (aio_result_t *)-1 &&
6172248Sraf 		    kaio_errno == EINTR) {
6182248Sraf 			errno = kaio_errno;
6192248Sraf 			resultp = (aio_result_t *)-1;
6202248Sraf 			break;
6212248Sraf 		} else if (timedwait) {
6222248Sraf 			hres = hrtend - gethrtime();
6232248Sraf 			if (hres <= 0) {
6242248Sraf 				/* time is up; return */
6252248Sraf 				resultp = NULL;
6262248Sraf 				break;
6272248Sraf 			} else {
6282248Sraf 				/*
6292248Sraf 				 * Some time left.  Round up the remaining time
6302248Sraf 				 * in nanoseconds to microsec.  Retry the call.
6312248Sraf 				 */
6322248Sraf 				hres += (NANOSEC / MICROSEC) - 1;
6332248Sraf 				wait->tv_sec = hres / NANOSEC;
6342248Sraf 				wait->tv_usec =
6354502Spraks 				    (hres % NANOSEC) / (NANOSEC / MICROSEC);
6362248Sraf 			}
6372248Sraf 		} else {
6382248Sraf 			ASSERT(kresultp == NULL && uresultp == NULL);
6392248Sraf 			resultp = NULL;
6402248Sraf 			continue;
6412248Sraf 		}
6422248Sraf 	}
6432248Sraf 	return (resultp);
6442248Sraf }
6452248Sraf 
6462248Sraf /*
6472248Sraf  * _aio_get_timedelta calculates the remaining time and stores the result
6482248Sraf  * into timespec_t *wait.
6492248Sraf  */
6502248Sraf 
6512248Sraf int
6522248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait)
6532248Sraf {
6542248Sraf 	int	ret = 0;
6552248Sraf 	struct	timeval cur;
6562248Sraf 	timespec_t curtime;
6572248Sraf 
6582248Sraf 	(void) gettimeofday(&cur, NULL);
6592248Sraf 	curtime.tv_sec = cur.tv_sec;
6602248Sraf 	curtime.tv_nsec = cur.tv_usec * 1000;   /* convert us to ns */
6612248Sraf 
6622248Sraf 	if (end->tv_sec >= curtime.tv_sec) {
6632248Sraf 		wait->tv_sec = end->tv_sec - curtime.tv_sec;
6642248Sraf 		if (end->tv_nsec >= curtime.tv_nsec) {
6652248Sraf 			wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
6662248Sraf 			if (wait->tv_sec == 0 && wait->tv_nsec == 0)
6672248Sraf 				ret = -1;	/* timer expired */
6682248Sraf 		} else {
6692248Sraf 			if (end->tv_sec > curtime.tv_sec) {
6702248Sraf 				wait->tv_sec -= 1;
6712248Sraf 				wait->tv_nsec = NANOSEC -
6722248Sraf 				    (curtime.tv_nsec - end->tv_nsec);
6732248Sraf 			} else {
6742248Sraf 				ret = -1;	/* timer expired */
6752248Sraf 			}
6762248Sraf 		}
6772248Sraf 	} else {
6782248Sraf 		ret = -1;
6792248Sraf 	}
6802248Sraf 	return (ret);
6812248Sraf }
6822248Sraf 
6832248Sraf /*
6842248Sraf  * If closing by file descriptor: we will simply cancel all the outstanding
6852248Sraf  * aio`s and return.  Those aio's in question will have either noticed the
6862248Sraf  * cancellation notice before, during, or after initiating io.
6872248Sraf  */
6882248Sraf int
6892248Sraf aiocancel_all(int fd)
6902248Sraf {
6912248Sraf 	aio_req_t *reqp;
6922248Sraf 	aio_req_t **reqpp;
6932248Sraf 	aio_worker_t *first;
6942248Sraf 	aio_worker_t *next;
6952248Sraf 	int canceled = 0;
6962248Sraf 	int done = 0;
6972248Sraf 	int cancelall = 0;
6982248Sraf 
6992248Sraf 	sig_mutex_lock(&__aio_mutex);
7002248Sraf 
7012248Sraf 	if (_aio_outstand_cnt == 0) {
7022248Sraf 		sig_mutex_unlock(&__aio_mutex);
7032248Sraf 		return (AIO_ALLDONE);
7042248Sraf 	}
7052248Sraf 
7062248Sraf 	/*
7072248Sraf 	 * Cancel requests from the read/write workers' queues.
7082248Sraf 	 */
7092248Sraf 	first = __nextworker_rw;
7102248Sraf 	next = first;
7112248Sraf 	do {
7122248Sraf 		_aio_cancel_work(next, fd, &canceled, &done);
7132248Sraf 	} while ((next = next->work_forw) != first);
7142248Sraf 
7152248Sraf 	/*
7162248Sraf 	 * finally, check if there are requests on the done queue that
7172248Sraf 	 * should be canceled.
7182248Sraf 	 */
7192248Sraf 	if (fd < 0)
7202248Sraf 		cancelall = 1;
7212248Sraf 	reqpp = &_aio_done_tail;
7222248Sraf 	while ((reqp = *reqpp) != NULL) {
7232248Sraf 		if (cancelall || reqp->req_args.fd == fd) {
7242248Sraf 			*reqpp = reqp->req_next;
7252248Sraf 			_aio_donecnt--;
7262248Sraf 			(void) _aio_hash_del(reqp->req_resultp);
7272248Sraf 			_aio_req_free(reqp);
7282248Sraf 		} else
7292248Sraf 			reqpp = &reqp->req_next;
7302248Sraf 	}
7312248Sraf 	if (cancelall) {
7322248Sraf 		ASSERT(_aio_donecnt == 0);
7332248Sraf 		_aio_done_head = NULL;
7342248Sraf 	}
7352248Sraf 	sig_mutex_unlock(&__aio_mutex);
7362248Sraf 
7372248Sraf 	if (canceled && done == 0)
7382248Sraf 		return (AIO_CANCELED);
7392248Sraf 	else if (done && canceled == 0)
7402248Sraf 		return (AIO_ALLDONE);
7412248Sraf 	else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
7422248Sraf 		return ((int)_kaio(AIOCANCEL, fd, NULL));
7432248Sraf 	return (AIO_NOTCANCELED);
7442248Sraf }
7452248Sraf 
7462248Sraf /*
7472248Sraf  * Cancel requests from a given work queue.  If the file descriptor
7482248Sraf  * parameter, fd, is non-negative, then only cancel those requests
7492248Sraf  * in this queue that are to this file descriptor.  If the fd
7502248Sraf  * parameter is -1, then cancel all requests.
7512248Sraf  */
7522248Sraf static void
7532248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
7542248Sraf {
7552248Sraf 	aio_req_t *reqp;
7562248Sraf 
7572248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
7582248Sraf 	/*
7592248Sraf 	 * cancel queued requests first.
7602248Sraf 	 */
7612248Sraf 	reqp = aiowp->work_tail1;
7622248Sraf 	while (reqp != NULL) {
7632248Sraf 		if (fd < 0 || reqp->req_args.fd == fd) {
7642248Sraf 			if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
7652248Sraf 				/*
7662248Sraf 				 * Callers locks were dropped.
7672248Sraf 				 * reqp is invalid; start traversing
7682248Sraf 				 * the list from the beginning again.
7692248Sraf 				 */
7702248Sraf 				reqp = aiowp->work_tail1;
7712248Sraf 				continue;
7722248Sraf 			}
7732248Sraf 		}
7742248Sraf 		reqp = reqp->req_next;
7752248Sraf 	}
7762248Sraf 	/*
7772248Sraf 	 * Since the queued requests have been canceled, there can
7782248Sraf 	 * only be one inprogress request that should be canceled.
7792248Sraf 	 */
7802248Sraf 	if ((reqp = aiowp->work_req) != NULL &&
7812248Sraf 	    (fd < 0 || reqp->req_args.fd == fd))
7822248Sraf 		(void) _aio_cancel_req(aiowp, reqp, canceled, done);
7832248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
7842248Sraf }
7852248Sraf 
7862248Sraf /*
7872248Sraf  * Cancel a request.  Return 1 if the callers locks were temporarily
7882248Sraf  * dropped, otherwise return 0.
7892248Sraf  */
7902248Sraf int
7912248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
7922248Sraf {
7932248Sraf 	int ostate = reqp->req_state;
7942248Sraf 
7952248Sraf 	ASSERT(MUTEX_HELD(&__aio_mutex));
7962248Sraf 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
7972248Sraf 	if (ostate == AIO_REQ_CANCELED)
7982248Sraf 		return (0);
7992248Sraf 	if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
8002248Sraf 		(*done)++;
8012248Sraf 		return (0);
8022248Sraf 	}
8032248Sraf 	if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
8042248Sraf 		ASSERT(POSIX_AIO(reqp));
8052248Sraf 		/* Cancel the queued aio_fsync() request */
8062248Sraf 		if (!reqp->req_head->lio_canned) {
8072248Sraf 			reqp->req_head->lio_canned = 1;
8082248Sraf 			_aio_outstand_cnt--;
8092248Sraf 			(*canceled)++;
8102248Sraf 		}
8112248Sraf 		return (0);
8122248Sraf 	}
8132248Sraf 	reqp->req_state = AIO_REQ_CANCELED;
8142248Sraf 	_aio_req_del(aiowp, reqp, ostate);
8152248Sraf 	(void) _aio_hash_del(reqp->req_resultp);
8162248Sraf 	(*canceled)++;
8172248Sraf 	if (reqp == aiowp->work_req) {
8182248Sraf 		ASSERT(ostate == AIO_REQ_INPROGRESS);
8192248Sraf 		/*
8202248Sraf 		 * Set the result values now, before _aiodone() is called.
8212248Sraf 		 * We do this because the application can expect aio_return
8222248Sraf 		 * and aio_errno to be set to -1 and ECANCELED, respectively,
8232248Sraf 		 * immediately after a successful return from aiocancel()
8242248Sraf 		 * or aio_cancel().
8252248Sraf 		 */
8262248Sraf 		_aio_set_result(reqp, -1, ECANCELED);
8272248Sraf 		(void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
8282248Sraf 		return (0);
8292248Sraf 	}
8302248Sraf 	if (!POSIX_AIO(reqp)) {
8312248Sraf 		_aio_outstand_cnt--;
8322248Sraf 		_aio_set_result(reqp, -1, ECANCELED);
8332248Sraf 		return (0);
8342248Sraf 	}
8352248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
8362248Sraf 	sig_mutex_unlock(&__aio_mutex);
8372248Sraf 	_aiodone(reqp, -1, ECANCELED);
8382248Sraf 	sig_mutex_lock(&__aio_mutex);
8392248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
8402248Sraf 	return (1);
8412248Sraf }
8422248Sraf 
8432248Sraf int
8442248Sraf _aio_create_worker(aio_req_t *reqp, int mode)
8452248Sraf {
8462248Sraf 	aio_worker_t *aiowp, **workers, **nextworker;
8472248Sraf 	int *aio_workerscnt;
8482248Sraf 	void *(*func)(void *);
8492248Sraf 	sigset_t oset;
8502248Sraf 	int error;
8512248Sraf 
8522248Sraf 	/*
8532248Sraf 	 * Put the new worker thread in the right queue.
8542248Sraf 	 */
8552248Sraf 	switch (mode) {
8562248Sraf 	case AIOREAD:
8572248Sraf 	case AIOWRITE:
8582248Sraf 	case AIOAREAD:
8592248Sraf 	case AIOAWRITE:
8602248Sraf #if !defined(_LP64)
8612248Sraf 	case AIOAREAD64:
8622248Sraf 	case AIOAWRITE64:
8632248Sraf #endif
8642248Sraf 		workers = &__workers_rw;
8652248Sraf 		nextworker = &__nextworker_rw;
8662248Sraf 		aio_workerscnt = &__rw_workerscnt;
8672248Sraf 		func = _aio_do_request;
8682248Sraf 		break;
8692248Sraf 	case AIONOTIFY:
8702248Sraf 		workers = &__workers_no;
8712248Sraf 		nextworker = &__nextworker_no;
8722248Sraf 		func = _aio_do_notify;
8732248Sraf 		aio_workerscnt = &__no_workerscnt;
8742248Sraf 		break;
8752248Sraf 	default:
8762248Sraf 		aio_panic("_aio_create_worker: invalid mode");
8772248Sraf 		break;
8782248Sraf 	}
8792248Sraf 
8802248Sraf 	if ((aiowp = _aio_worker_alloc()) == NULL)
8812248Sraf 		return (-1);
8822248Sraf 
8832248Sraf 	if (reqp) {
8842248Sraf 		reqp->req_state = AIO_REQ_QUEUED;
8852248Sraf 		reqp->req_worker = aiowp;
8862248Sraf 		aiowp->work_head1 = reqp;
8872248Sraf 		aiowp->work_tail1 = reqp;
8882248Sraf 		aiowp->work_next1 = reqp;
8892248Sraf 		aiowp->work_count1 = 1;
8902248Sraf 		aiowp->work_minload1 = 1;
8912248Sraf 	}
8922248Sraf 
8932248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
8942248Sraf 	error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
8954502Spraks 	    THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
8962248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
8972248Sraf 	if (error) {
8982248Sraf 		if (reqp) {
8992248Sraf 			reqp->req_state = 0;
9002248Sraf 			reqp->req_worker = NULL;
9012248Sraf 		}
9022248Sraf 		_aio_worker_free(aiowp);
9032248Sraf 		return (-1);
9042248Sraf 	}
9052248Sraf 
9062248Sraf 	lmutex_lock(&__aio_mutex);
9072248Sraf 	(*aio_workerscnt)++;
9082248Sraf 	if (*workers == NULL) {
9092248Sraf 		aiowp->work_forw = aiowp;
9102248Sraf 		aiowp->work_backw = aiowp;
9112248Sraf 		*nextworker = aiowp;
9122248Sraf 		*workers = aiowp;
9132248Sraf 	} else {
9142248Sraf 		aiowp->work_backw = (*workers)->work_backw;
9152248Sraf 		aiowp->work_forw = (*workers);
9162248Sraf 		(*workers)->work_backw->work_forw = aiowp;
9172248Sraf 		(*workers)->work_backw = aiowp;
9182248Sraf 	}
9192248Sraf 	_aio_worker_cnt++;
9202248Sraf 	lmutex_unlock(&__aio_mutex);
9212248Sraf 
9222248Sraf 	(void) thr_continue(aiowp->work_tid);
9232248Sraf 
9242248Sraf 	return (0);
9252248Sraf }
9262248Sraf 
9272248Sraf /*
9282248Sraf  * This is the worker's main routine.
9292248Sraf  * The task of this function is to execute all queued requests;
9302248Sraf  * once the last pending request is executed this function will block
9312248Sraf  * in _aio_idle().  A new incoming request must wakeup this thread to
9322248Sraf  * restart the work.
9332248Sraf  * Every worker has an own work queue.  The queue lock is required
9342248Sraf  * to synchronize the addition of new requests for this worker or
9352248Sraf  * cancellation of pending/running requests.
9362248Sraf  *
9372248Sraf  * Cancellation scenarios:
9382248Sraf  * The cancellation of a request is being done asynchronously using
9392248Sraf  * _aio_cancel_req() from another thread context.
9402248Sraf  * A queued request can be cancelled in different manners :
9412248Sraf  * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
9422248Sraf  *	- lock the queue -> remove the request -> unlock the queue
9432248Sraf  *	- this function/thread does not detect this cancellation process
9442248Sraf  * b) request is in progress (AIO_REQ_INPROGRESS) :
9452248Sraf  *	- this function first allow the cancellation of the running
9462248Sraf  *	  request with the flag "work_cancel_flg=1"
9472248Sraf  * 		see _aio_req_get() -> _aio_cancel_on()
9482248Sraf  *	  During this phase, it is allowed to interrupt the worker
9492248Sraf  *	  thread running the request (this thread) using the SIGAIOCANCEL
9502248Sraf  *	  signal.
9512248Sraf  *	  Once this thread returns from the kernel (because the request
9522248Sraf  *	  is just done), then it must disable a possible cancellation
9532248Sraf  *	  and proceed to finish the request.  To disable the cancellation
9542248Sraf  *	  this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
9552248Sraf  * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
9562248Sraf  *	  same procedure as in a)
9572248Sraf  *
9582248Sraf  * To b)
9592248Sraf  *	This thread uses sigsetjmp() to define the position in the code, where
9602248Sraf  *	it wish to continue working in the case that a SIGAIOCANCEL signal
9612248Sraf  *	is detected.
9622248Sraf  *	Normally this thread should get the cancellation signal during the
9632248Sraf  *	kernel phase (reading or writing).  In that case the signal handler
9642248Sraf  *	aiosigcancelhndlr() is activated using the worker thread context,
9652248Sraf  *	which again will use the siglongjmp() function to break the standard
9662248Sraf  *	code flow and jump to the "sigsetjmp" position, provided that
9672248Sraf  *	"work_cancel_flg" is set to "1".
9682248Sraf  *	Because the "work_cancel_flg" is only manipulated by this worker
9692248Sraf  *	thread and it can only run on one CPU at a given time, it is not
9702248Sraf  *	necessary to protect that flag with the queue lock.
9712248Sraf  *	Returning from the kernel (read or write system call) we must
9722248Sraf  *	first disable the use of the SIGAIOCANCEL signal and accordingly
9732248Sraf  *	the use of the siglongjmp() function to prevent a possible deadlock:
9742248Sraf  *	- It can happens that this worker thread returns from the kernel and
9752248Sraf  *	  blocks in "work_qlock1",
9762248Sraf  *	- then a second thread cancels the apparently "in progress" request
9772248Sraf  *	  and sends the SIGAIOCANCEL signal to the worker thread,
9782248Sraf  *	- the worker thread gets assigned the "work_qlock1" and will returns
9792248Sraf  *	  from the kernel,
9802248Sraf  *	- the kernel detects the pending signal and activates the signal
9812248Sraf  *	  handler instead,
9822248Sraf  *	- if the "work_cancel_flg" is still set then the signal handler
9832248Sraf  *	  should use siglongjmp() to cancel the "in progress" request and
9842248Sraf  *	  it would try to acquire the same work_qlock1 in _aio_req_get()
9852248Sraf  *	  for a second time => deadlock.
9862248Sraf  *	To avoid that situation we disable the cancellation of the request
9872248Sraf  *	in progress BEFORE we try to acquire the work_qlock1.
9882248Sraf  *	In that case the signal handler will not call siglongjmp() and the
9892248Sraf  *	worker thread will continue running the standard code flow.
9902248Sraf  *	Then this thread must check the AIO_REQ_CANCELED flag to emulate
9912248Sraf  *	an eventually required siglongjmp() freeing the work_qlock1 and
9922248Sraf  *	avoiding a deadlock.
9932248Sraf  */
9942248Sraf void *
9952248Sraf _aio_do_request(void *arglist)
9962248Sraf {
9972248Sraf 	aio_worker_t *aiowp = (aio_worker_t *)arglist;
9982248Sraf 	ulwp_t *self = curthread;
9992248Sraf 	struct aio_args *arg;
10002248Sraf 	aio_req_t *reqp;		/* current AIO request */
10012248Sraf 	ssize_t retval;
10025937Sraf 	int append;
10032248Sraf 	int error;
10042248Sraf 
10052248Sraf 	if (pthread_setspecific(_aio_key, aiowp) != 0)
10062248Sraf 		aio_panic("_aio_do_request, pthread_setspecific()");
10072248Sraf 	(void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
10082248Sraf 	ASSERT(aiowp->work_req == NULL);
10092248Sraf 
10102248Sraf 	/*
10112248Sraf 	 * We resume here when an operation is cancelled.
10122248Sraf 	 * On first entry, aiowp->work_req == NULL, so all
10132248Sraf 	 * we do is block SIGAIOCANCEL.
10142248Sraf 	 */
10152248Sraf 	(void) sigsetjmp(aiowp->work_jmp_buf, 0);
10162248Sraf 	ASSERT(self->ul_sigdefer == 0);
10172248Sraf 
10182248Sraf 	sigoff(self);	/* block SIGAIOCANCEL */
10192248Sraf 	if (aiowp->work_req != NULL)
10202248Sraf 		_aio_finish_request(aiowp, -1, ECANCELED);
10212248Sraf 
10222248Sraf 	for (;;) {
10232248Sraf 		/*
10242248Sraf 		 * Put completed requests on aio_done_list.  This has
10252248Sraf 		 * to be done as part of the main loop to ensure that
10262248Sraf 		 * we don't artificially starve any aiowait'ers.
10272248Sraf 		 */
10282248Sraf 		if (aiowp->work_done1)
10292248Sraf 			_aio_work_done(aiowp);
10302248Sraf 
10312248Sraf top:
10322248Sraf 		/* consume any deferred SIGAIOCANCEL signal here */
10332248Sraf 		sigon(self);
10342248Sraf 		sigoff(self);
10352248Sraf 
10362248Sraf 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
10372248Sraf 			if (_aio_idle(aiowp) != 0)
10382248Sraf 				goto top;
10392248Sraf 		}
10402248Sraf 		arg = &reqp->req_args;
10412248Sraf 		ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
10422248Sraf 		    reqp->req_state == AIO_REQ_CANCELED);
10432248Sraf 		error = 0;
10442248Sraf 
10452248Sraf 		switch (reqp->req_op) {
10462248Sraf 		case AIOREAD:
10472248Sraf 		case AIOAREAD:
10482248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
10492248Sraf 			retval = pread(arg->fd, arg->buf,
10502248Sraf 			    arg->bufsz, arg->offset);
10512248Sraf 			if (retval == -1) {
10522248Sraf 				if (errno == ESPIPE) {
10532248Sraf 					retval = read(arg->fd,
10542248Sraf 					    arg->buf, arg->bufsz);
10552248Sraf 					if (retval == -1)
10562248Sraf 						error = errno;
10572248Sraf 				} else {
10582248Sraf 					error = errno;
10592248Sraf 				}
10602248Sraf 			}
10612248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
10622248Sraf 			break;
10632248Sraf 		case AIOWRITE:
10642248Sraf 		case AIOAWRITE:
10655937Sraf 			/*
10665937Sraf 			 * The SUSv3 POSIX spec for aio_write() states:
10675937Sraf 			 *	If O_APPEND is set for the file descriptor,
10685937Sraf 			 *	write operations append to the file in the
10695937Sraf 			 *	same order as the calls were made.
10705937Sraf 			 * but, somewhat inconsistently, it requires pwrite()
10715937Sraf 			 * to ignore the O_APPEND setting.  So we have to use
10725937Sraf 			 * fcntl() to get the open modes and call write() for
10735937Sraf 			 * the O_APPEND case.
10745937Sraf 			 */
10755937Sraf 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
10762248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
10775937Sraf 			retval = append?
10785937Sraf 			    write(arg->fd, arg->buf, arg->bufsz) :
10795937Sraf 			    pwrite(arg->fd, arg->buf, arg->bufsz,
10805937Sraf 			    arg->offset);
10812248Sraf 			if (retval == -1) {
10822248Sraf 				if (errno == ESPIPE) {
10832248Sraf 					retval = write(arg->fd,
10842248Sraf 					    arg->buf, arg->bufsz);
10852248Sraf 					if (retval == -1)
10862248Sraf 						error = errno;
10872248Sraf 				} else {
10882248Sraf 					error = errno;
10892248Sraf 				}
10902248Sraf 			}
10912248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
10922248Sraf 			break;
10932248Sraf #if !defined(_LP64)
10942248Sraf 		case AIOAREAD64:
10952248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
10962248Sraf 			retval = pread64(arg->fd, arg->buf,
10972248Sraf 			    arg->bufsz, arg->offset);
10982248Sraf 			if (retval == -1) {
10992248Sraf 				if (errno == ESPIPE) {
11002248Sraf 					retval = read(arg->fd,
11012248Sraf 					    arg->buf, arg->bufsz);
11022248Sraf 					if (retval == -1)
11032248Sraf 						error = errno;
11042248Sraf 				} else {
11052248Sraf 					error = errno;
11062248Sraf 				}
11072248Sraf 			}
11082248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
11092248Sraf 			break;
11102248Sraf 		case AIOAWRITE64:
11115937Sraf 			/*
11125937Sraf 			 * The SUSv3 POSIX spec for aio_write() states:
11135937Sraf 			 *	If O_APPEND is set for the file descriptor,
11145937Sraf 			 *	write operations append to the file in the
11155937Sraf 			 *	same order as the calls were made.
11165937Sraf 			 * but, somewhat inconsistently, it requires pwrite()
11175937Sraf 			 * to ignore the O_APPEND setting.  So we have to use
11185937Sraf 			 * fcntl() to get the open modes and call write() for
11195937Sraf 			 * the O_APPEND case.
11205937Sraf 			 */
11215937Sraf 			append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
11222248Sraf 			sigon(self);	/* unblock SIGAIOCANCEL */
11235937Sraf 			retval = append?
11245937Sraf 			    write(arg->fd, arg->buf, arg->bufsz) :
11255937Sraf 			    pwrite64(arg->fd, arg->buf, arg->bufsz,
11265937Sraf 			    arg->offset);
11272248Sraf 			if (retval == -1) {
11282248Sraf 				if (errno == ESPIPE) {
11292248Sraf 					retval = write(arg->fd,
11302248Sraf 					    arg->buf, arg->bufsz);
11312248Sraf 					if (retval == -1)
11322248Sraf 						error = errno;
11332248Sraf 				} else {
11342248Sraf 					error = errno;
11352248Sraf 				}
11362248Sraf 			}
11372248Sraf 			sigoff(self);	/* block SIGAIOCANCEL */
11382248Sraf 			break;
11392248Sraf #endif	/* !defined(_LP64) */
11402248Sraf 		case AIOFSYNC:
11412248Sraf 			if (_aio_fsync_del(aiowp, reqp))
11422248Sraf 				goto top;
11432248Sraf 			ASSERT(reqp->req_head == NULL);
11442248Sraf 			/*
11452248Sraf 			 * All writes for this fsync request are now
11462248Sraf 			 * acknowledged.  Now make these writes visible
11472248Sraf 			 * and put the final request into the hash table.
11482248Sraf 			 */
11492248Sraf 			if (reqp->req_state == AIO_REQ_CANCELED) {
11502248Sraf 				/* EMPTY */;
11512248Sraf 			} else if (arg->offset == O_SYNC) {
11522248Sraf 				if ((retval = __fdsync(arg->fd, FSYNC)) == -1)
11532248Sraf 					error = errno;
11542248Sraf 			} else {
11552248Sraf 				if ((retval = __fdsync(arg->fd, FDSYNC)) == -1)
11562248Sraf 					error = errno;
11572248Sraf 			}
11582248Sraf 			if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
11592248Sraf 				aio_panic("_aio_do_request(): AIOFSYNC: "
11602248Sraf 				    "request already in hash table");
11612248Sraf 			break;
11622248Sraf 		default:
11632248Sraf 			aio_panic("_aio_do_request, bad op");
11642248Sraf 		}
11652248Sraf 
11662248Sraf 		_aio_finish_request(aiowp, retval, error);
11672248Sraf 	}
11682248Sraf 	/* NOTREACHED */
11692248Sraf 	return (NULL);
11702248Sraf }
11712248Sraf 
11722248Sraf /*
11732248Sraf  * Perform the tail processing for _aio_do_request().
11742248Sraf  * The in-progress request may or may not have been cancelled.
11752248Sraf  */
11762248Sraf static void
11772248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
11782248Sraf {
11792248Sraf 	aio_req_t *reqp;
11802248Sraf 
11812248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
11822248Sraf 	if ((reqp = aiowp->work_req) == NULL)
11832248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
11842248Sraf 	else {
11852248Sraf 		aiowp->work_req = NULL;
11862248Sraf 		if (reqp->req_state == AIO_REQ_CANCELED) {
11872248Sraf 			retval = -1;
11882248Sraf 			error = ECANCELED;
11892248Sraf 		}
11902248Sraf 		if (!POSIX_AIO(reqp)) {
11914502Spraks 			int notify;
11922248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
11932248Sraf 			sig_mutex_lock(&__aio_mutex);
11942248Sraf 			if (reqp->req_state == AIO_REQ_INPROGRESS)
11952248Sraf 				reqp->req_state = AIO_REQ_DONE;
11964502Spraks 			/*
11974502Spraks 			 * If it was canceled, this request will not be
11984502Spraks 			 * added to done list. Just free it.
11994502Spraks 			 */
12004502Spraks 			if (error == ECANCELED) {
12012248Sraf 				_aio_outstand_cnt--;
12024502Spraks 				_aio_req_free(reqp);
12034502Spraks 			} else {
12044502Spraks 				_aio_set_result(reqp, retval, error);
12054502Spraks 				_aio_req_done_cnt++;
12064502Spraks 			}
12074502Spraks 			/*
12084502Spraks 			 * Notify any thread that may have blocked
12094502Spraks 			 * because it saw an outstanding request.
12104502Spraks 			 */
12114502Spraks 			notify = 0;
12124502Spraks 			if (_aio_outstand_cnt == 0 && _aiowait_flag) {
12134502Spraks 				notify = 1;
12144502Spraks 			}
12152248Sraf 			sig_mutex_unlock(&__aio_mutex);
12164502Spraks 			if (notify) {
12174502Spraks 				(void) _kaio(AIONOTIFY);
12184502Spraks 			}
12192248Sraf 		} else {
12202248Sraf 			if (reqp->req_state == AIO_REQ_INPROGRESS)
12212248Sraf 				reqp->req_state = AIO_REQ_DONE;
12222248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
12232248Sraf 			_aiodone(reqp, retval, error);
12242248Sraf 		}
12252248Sraf 	}
12262248Sraf }
12272248Sraf 
12282248Sraf void
12292248Sraf _aio_req_mark_done(aio_req_t *reqp)
12302248Sraf {
12312248Sraf #if !defined(_LP64)
12322248Sraf 	if (reqp->req_largefile)
12332248Sraf 		((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
12342248Sraf 	else
12352248Sraf #endif
12362248Sraf 		((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
12372248Sraf }
12382248Sraf 
12392248Sraf /*
12402248Sraf  * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
12412248Sraf  * hopefully to consume one of our queued signals.
12422248Sraf  */
12432248Sraf static void
12442248Sraf _aio_delay(int ticks)
12452248Sraf {
12462248Sraf 	(void) usleep(ticks * (MICROSEC / hz));
12472248Sraf }
12482248Sraf 
12492248Sraf /*
12502248Sraf  * Actually send the notifications.
12512248Sraf  * We could block indefinitely here if the application
12522248Sraf  * is not listening for the signal or port notifications.
12532248Sraf  */
12542248Sraf static void
12552248Sraf send_notification(notif_param_t *npp)
12562248Sraf {
12572248Sraf 	extern int __sigqueue(pid_t pid, int signo,
12584502Spraks 	    /* const union sigval */ void *value, int si_code, int block);
12592248Sraf 
12602248Sraf 	if (npp->np_signo)
12612248Sraf 		(void) __sigqueue(__pid, npp->np_signo, npp->np_user,
12622248Sraf 		    SI_ASYNCIO, 1);
12632248Sraf 	else if (npp->np_port >= 0)
12642248Sraf 		(void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
12652248Sraf 		    npp->np_event, npp->np_object, npp->np_user);
12662248Sraf 
12672248Sraf 	if (npp->np_lio_signo)
12682248Sraf 		(void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
12692248Sraf 		    SI_ASYNCIO, 1);
12702248Sraf 	else if (npp->np_lio_port >= 0)
12712248Sraf 		(void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
12722248Sraf 		    npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
12732248Sraf }
12742248Sraf 
12752248Sraf /*
12762248Sraf  * Asynchronous notification worker.
12772248Sraf  */
12782248Sraf void *
12792248Sraf _aio_do_notify(void *arg)
12802248Sraf {
12812248Sraf 	aio_worker_t *aiowp = (aio_worker_t *)arg;
12822248Sraf 	aio_req_t *reqp;
12832248Sraf 
12842248Sraf 	/*
12852248Sraf 	 * This isn't really necessary.  All signals are blocked.
12862248Sraf 	 */
12872248Sraf 	if (pthread_setspecific(_aio_key, aiowp) != 0)
12882248Sraf 		aio_panic("_aio_do_notify, pthread_setspecific()");
12892248Sraf 
12902248Sraf 	/*
12912248Sraf 	 * Notifications are never cancelled.
12922248Sraf 	 * All signals remain blocked, forever.
12932248Sraf 	 */
12942248Sraf 	for (;;) {
12952248Sraf 		while ((reqp = _aio_req_get(aiowp)) == NULL) {
12962248Sraf 			if (_aio_idle(aiowp) != 0)
12972248Sraf 				aio_panic("_aio_do_notify: _aio_idle() failed");
12982248Sraf 		}
12992248Sraf 		send_notification(&reqp->req_notify);
13002248Sraf 		_aio_req_free(reqp);
13012248Sraf 	}
13022248Sraf 
13032248Sraf 	/* NOTREACHED */
13042248Sraf 	return (NULL);
13052248Sraf }
13062248Sraf 
13072248Sraf /*
13082248Sraf  * Do the completion semantics for a request that was either canceled
13092248Sraf  * by _aio_cancel_req() or was completed by _aio_do_request().
13102248Sraf  */
13112248Sraf static void
13122248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error)
13132248Sraf {
13142248Sraf 	aio_result_t *resultp = reqp->req_resultp;
13152248Sraf 	int notify = 0;
13162248Sraf 	aio_lio_t *head;
13172248Sraf 	int sigev_none;
13182248Sraf 	int sigev_signal;
13192248Sraf 	int sigev_thread;
13202248Sraf 	int sigev_port;
13212248Sraf 	notif_param_t np;
13222248Sraf 
13232248Sraf 	/*
13242248Sraf 	 * We call _aiodone() only for Posix I/O.
13252248Sraf 	 */
13262248Sraf 	ASSERT(POSIX_AIO(reqp));
13272248Sraf 
13282248Sraf 	sigev_none = 0;
13292248Sraf 	sigev_signal = 0;
13302248Sraf 	sigev_thread = 0;
13312248Sraf 	sigev_port = 0;
13322248Sraf 	np.np_signo = 0;
13332248Sraf 	np.np_port = -1;
13342248Sraf 	np.np_lio_signo = 0;
13352248Sraf 	np.np_lio_port = -1;
13362248Sraf 
13372248Sraf 	switch (reqp->req_sigevent.sigev_notify) {
13382248Sraf 	case SIGEV_NONE:
13392248Sraf 		sigev_none = 1;
13402248Sraf 		break;
13412248Sraf 	case SIGEV_SIGNAL:
13422248Sraf 		sigev_signal = 1;
13432248Sraf 		break;
13442248Sraf 	case SIGEV_THREAD:
13452248Sraf 		sigev_thread = 1;
13462248Sraf 		break;
13472248Sraf 	case SIGEV_PORT:
13482248Sraf 		sigev_port = 1;
13492248Sraf 		break;
13502248Sraf 	default:
13512248Sraf 		aio_panic("_aiodone: improper sigev_notify");
13522248Sraf 		break;
13532248Sraf 	}
13542248Sraf 
13552248Sraf 	/*
13562248Sraf 	 * Figure out the notification parameters while holding __aio_mutex.
13572248Sraf 	 * Actually perform the notifications after dropping __aio_mutex.
13582248Sraf 	 * This allows us to sleep for a long time (if the notifications
13592248Sraf 	 * incur delays) without impeding other async I/O operations.
13602248Sraf 	 */
13612248Sraf 
13622248Sraf 	sig_mutex_lock(&__aio_mutex);
13632248Sraf 
13642248Sraf 	if (sigev_signal) {
13652248Sraf 		if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
13662248Sraf 			notify = 1;
13672248Sraf 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
13682248Sraf 	} else if (sigev_thread | sigev_port) {
13692248Sraf 		if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
13702248Sraf 			notify = 1;
13712248Sraf 		np.np_event = reqp->req_op;
13722248Sraf 		if (np.np_event == AIOFSYNC && reqp->req_largefile)
13732248Sraf 			np.np_event = AIOFSYNC64;
13742248Sraf 		np.np_object = (uintptr_t)reqp->req_aiocbp;
13752248Sraf 		np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
13762248Sraf 	}
13772248Sraf 
13782248Sraf 	if (resultp->aio_errno == EINPROGRESS)
13792248Sraf 		_aio_set_result(reqp, retval, error);
13802248Sraf 
13812248Sraf 	_aio_outstand_cnt--;
13822248Sraf 
13832248Sraf 	head = reqp->req_head;
13842248Sraf 	reqp->req_head = NULL;
13852248Sraf 
13862248Sraf 	if (sigev_none) {
13872248Sraf 		_aio_enq_doneq(reqp);
13882248Sraf 		reqp = NULL;
13892248Sraf 	} else {
13902248Sraf 		(void) _aio_hash_del(resultp);
13912248Sraf 		_aio_req_mark_done(reqp);
13922248Sraf 	}
13932248Sraf 
13942248Sraf 	_aio_waitn_wakeup();
13952248Sraf 
13962248Sraf 	/*
13972248Sraf 	 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
13982248Sraf 	 * __aio_suspend() increments "_aio_kernel_suspend"
13992248Sraf 	 * when they are waiting in the kernel for completed I/Os.
14002248Sraf 	 *
14012248Sraf 	 * _kaio(AIONOTIFY) awakes the corresponding function
14022248Sraf 	 * in the kernel; then the corresponding __aio_waitn() or
14032248Sraf 	 * __aio_suspend() function could reap the recently
14042248Sraf 	 * completed I/Os (_aiodone()).
14052248Sraf 	 */
14062248Sraf 	if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
14072248Sraf 		(void) _kaio(AIONOTIFY);
14082248Sraf 
14092248Sraf 	sig_mutex_unlock(&__aio_mutex);
14102248Sraf 
14112248Sraf 	if (head != NULL) {
14122248Sraf 		/*
14132248Sraf 		 * If all the lio requests have completed,
14142248Sraf 		 * prepare to notify the waiting thread.
14152248Sraf 		 */
14162248Sraf 		sig_mutex_lock(&head->lio_mutex);
14172248Sraf 		ASSERT(head->lio_refcnt == head->lio_nent);
14182248Sraf 		if (head->lio_refcnt == 1) {
14192248Sraf 			int waiting = 0;
14202248Sraf 			if (head->lio_mode == LIO_WAIT) {
14212248Sraf 				if ((waiting = head->lio_waiting) != 0)
14222248Sraf 					(void) cond_signal(&head->lio_cond_cv);
14232248Sraf 			} else if (head->lio_port < 0) { /* none or signal */
14242248Sraf 				if ((np.np_lio_signo = head->lio_signo) != 0)
14252248Sraf 					notify = 1;
14262248Sraf 				np.np_lio_user = head->lio_sigval.sival_ptr;
14272248Sraf 			} else {			/* thread or port */
14282248Sraf 				notify = 1;
14292248Sraf 				np.np_lio_port = head->lio_port;
14302248Sraf 				np.np_lio_event = head->lio_event;
14312248Sraf 				np.np_lio_object =
14322248Sraf 				    (uintptr_t)head->lio_sigevent;
14332248Sraf 				np.np_lio_user = head->lio_sigval.sival_ptr;
14342248Sraf 			}
14352248Sraf 			head->lio_nent = head->lio_refcnt = 0;
14362248Sraf 			sig_mutex_unlock(&head->lio_mutex);
14372248Sraf 			if (waiting == 0)
14382248Sraf 				_aio_lio_free(head);
14392248Sraf 		} else {
14402248Sraf 			head->lio_nent--;
14412248Sraf 			head->lio_refcnt--;
14422248Sraf 			sig_mutex_unlock(&head->lio_mutex);
14432248Sraf 		}
14442248Sraf 	}
14452248Sraf 
14462248Sraf 	/*
14472248Sraf 	 * The request is completed; now perform the notifications.
14482248Sraf 	 */
14492248Sraf 	if (notify) {
14502248Sraf 		if (reqp != NULL) {
14512248Sraf 			/*
14522248Sraf 			 * We usually put the request on the notification
14532248Sraf 			 * queue because we don't want to block and delay
14542248Sraf 			 * other operations behind us in the work queue.
14552248Sraf 			 * Also we must never block on a cancel notification
14562248Sraf 			 * because we are being called from an application
14572248Sraf 			 * thread in this case and that could lead to deadlock
14582248Sraf 			 * if no other thread is receiving notificatins.
14592248Sraf 			 */
14602248Sraf 			reqp->req_notify = np;
14612248Sraf 			reqp->req_op = AIONOTIFY;
14622248Sraf 			_aio_req_add(reqp, &__workers_no, AIONOTIFY);
14632248Sraf 			reqp = NULL;
14642248Sraf 		} else {
14652248Sraf 			/*
14662248Sraf 			 * We already put the request on the done queue,
14672248Sraf 			 * so we can't queue it to the notification queue.
14682248Sraf 			 * Just do the notification directly.
14692248Sraf 			 */
14702248Sraf 			send_notification(&np);
14712248Sraf 		}
14722248Sraf 	}
14732248Sraf 
14742248Sraf 	if (reqp != NULL)
14752248Sraf 		_aio_req_free(reqp);
14762248Sraf }
14772248Sraf 
14782248Sraf /*
14792248Sraf  * Delete fsync requests from list head until there is
14802248Sraf  * only one left.  Return 0 when there is only one,
14812248Sraf  * otherwise return a non-zero value.
14822248Sraf  */
14832248Sraf static int
14842248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
14852248Sraf {
14862248Sraf 	aio_lio_t *head = reqp->req_head;
14872248Sraf 	int rval = 0;
14882248Sraf 
14892248Sraf 	ASSERT(reqp == aiowp->work_req);
14902248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
14912248Sraf 	sig_mutex_lock(&head->lio_mutex);
14922248Sraf 	if (head->lio_refcnt > 1) {
14932248Sraf 		head->lio_refcnt--;
14942248Sraf 		head->lio_nent--;
14952248Sraf 		aiowp->work_req = NULL;
14962248Sraf 		sig_mutex_unlock(&head->lio_mutex);
14972248Sraf 		sig_mutex_unlock(&aiowp->work_qlock1);
14982248Sraf 		sig_mutex_lock(&__aio_mutex);
14992248Sraf 		_aio_outstand_cnt--;
15002248Sraf 		_aio_waitn_wakeup();
15012248Sraf 		sig_mutex_unlock(&__aio_mutex);
15022248Sraf 		_aio_req_free(reqp);
15032248Sraf 		return (1);
15042248Sraf 	}
15052248Sraf 	ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
15062248Sraf 	reqp->req_head = NULL;
15072248Sraf 	if (head->lio_canned)
15082248Sraf 		reqp->req_state = AIO_REQ_CANCELED;
15092248Sraf 	if (head->lio_mode == LIO_DESTROY) {
15102248Sraf 		aiowp->work_req = NULL;
15112248Sraf 		rval = 1;
15122248Sraf 	}
15132248Sraf 	sig_mutex_unlock(&head->lio_mutex);
15142248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
15152248Sraf 	head->lio_refcnt--;
15162248Sraf 	head->lio_nent--;
15172248Sraf 	_aio_lio_free(head);
15182248Sraf 	if (rval != 0)
15192248Sraf 		_aio_req_free(reqp);
15202248Sraf 	return (rval);
15212248Sraf }
15222248Sraf 
15232248Sraf /*
15242248Sraf  * A worker is set idle when its work queue is empty.
15252248Sraf  * The worker checks again that it has no more work
15262248Sraf  * and then goes to sleep waiting for more work.
15272248Sraf  */
15282248Sraf int
15292248Sraf _aio_idle(aio_worker_t *aiowp)
15302248Sraf {
15312248Sraf 	int error = 0;
15322248Sraf 
15332248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
15342248Sraf 	if (aiowp->work_count1 == 0) {
15352248Sraf 		ASSERT(aiowp->work_minload1 == 0);
15362248Sraf 		aiowp->work_idleflg = 1;
15372248Sraf 		/*
15382248Sraf 		 * A cancellation handler is not needed here.
15392248Sraf 		 * aio worker threads are never cancelled via pthread_cancel().
15402248Sraf 		 */
15412248Sraf 		error = sig_cond_wait(&aiowp->work_idle_cv,
15422248Sraf 		    &aiowp->work_qlock1);
15432248Sraf 		/*
15442248Sraf 		 * The idle flag is normally cleared before worker is awakened
15452248Sraf 		 * by aio_req_add().  On error (EINTR), we clear it ourself.
15462248Sraf 		 */
15472248Sraf 		if (error)
15482248Sraf 			aiowp->work_idleflg = 0;
15492248Sraf 	}
15502248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
15512248Sraf 	return (error);
15522248Sraf }
15532248Sraf 
15542248Sraf /*
15552248Sraf  * A worker's completed AIO requests are placed onto a global
15562248Sraf  * done queue.  The application is only sent a SIGIO signal if
15572248Sraf  * the process has a handler enabled and it is not waiting via
15582248Sraf  * aiowait().
15592248Sraf  */
15602248Sraf static void
15612248Sraf _aio_work_done(aio_worker_t *aiowp)
15622248Sraf {
15632248Sraf 	aio_req_t *reqp;
15642248Sraf 
15652248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
15662248Sraf 	reqp = aiowp->work_prev1;
15672248Sraf 	reqp->req_next = NULL;
15682248Sraf 	aiowp->work_done1 = 0;
15692248Sraf 	aiowp->work_tail1 = aiowp->work_next1;
15702248Sraf 	if (aiowp->work_tail1 == NULL)
15712248Sraf 		aiowp->work_head1 = NULL;
15722248Sraf 	aiowp->work_prev1 = NULL;
15732248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
15742248Sraf 	sig_mutex_lock(&__aio_mutex);
15752248Sraf 	_aio_donecnt++;
15762248Sraf 	_aio_outstand_cnt--;
15772248Sraf 	_aio_req_done_cnt--;
15782248Sraf 	ASSERT(_aio_donecnt > 0 &&
15792248Sraf 	    _aio_outstand_cnt >= 0 &&
15802248Sraf 	    _aio_req_done_cnt >= 0);
15812248Sraf 	ASSERT(reqp != NULL);
15822248Sraf 
15832248Sraf 	if (_aio_done_tail == NULL) {
15842248Sraf 		_aio_done_head = _aio_done_tail = reqp;
15852248Sraf 	} else {
15862248Sraf 		_aio_done_head->req_next = reqp;
15872248Sraf 		_aio_done_head = reqp;
15882248Sraf 	}
15892248Sraf 
15902248Sraf 	if (_aiowait_flag) {
15912248Sraf 		sig_mutex_unlock(&__aio_mutex);
15922248Sraf 		(void) _kaio(AIONOTIFY);
15932248Sraf 	} else {
15942248Sraf 		sig_mutex_unlock(&__aio_mutex);
15952248Sraf 		if (_sigio_enabled)
15962248Sraf 			(void) kill(__pid, SIGIO);
15972248Sraf 	}
15982248Sraf }
15992248Sraf 
16002248Sraf /*
16012248Sraf  * The done queue consists of AIO requests that are in either the
16022248Sraf  * AIO_REQ_DONE or AIO_REQ_CANCELED state.  Requests that were cancelled
16032248Sraf  * are discarded.  If the done queue is empty then NULL is returned.
16042248Sraf  * Otherwise the address of a done aio_result_t is returned.
16052248Sraf  */
16062248Sraf aio_result_t *
16072248Sraf _aio_req_done(void)
16082248Sraf {
16092248Sraf 	aio_req_t *reqp;
16102248Sraf 	aio_result_t *resultp;
16112248Sraf 
16122248Sraf 	ASSERT(MUTEX_HELD(&__aio_mutex));
16132248Sraf 
16142248Sraf 	if ((reqp = _aio_done_tail) != NULL) {
16152248Sraf 		if ((_aio_done_tail = reqp->req_next) == NULL)
16162248Sraf 			_aio_done_head = NULL;
16172248Sraf 		ASSERT(_aio_donecnt > 0);
16182248Sraf 		_aio_donecnt--;
16192248Sraf 		(void) _aio_hash_del(reqp->req_resultp);
16202248Sraf 		resultp = reqp->req_resultp;
16212248Sraf 		ASSERT(reqp->req_state == AIO_REQ_DONE);
16222248Sraf 		_aio_req_free(reqp);
16232248Sraf 		return (resultp);
16242248Sraf 	}
16252248Sraf 	/* is queue empty? */
16262248Sraf 	if (reqp == NULL && _aio_outstand_cnt == 0) {
16272248Sraf 		return ((aio_result_t *)-1);
16282248Sraf 	}
16292248Sraf 	return (NULL);
16302248Sraf }
16312248Sraf 
16322248Sraf /*
16332248Sraf  * Set the return and errno values for the application's use.
16342248Sraf  *
16352248Sraf  * For the Posix interfaces, we must set the return value first followed
16362248Sraf  * by the errno value because the Posix interfaces allow for a change
16372248Sraf  * in the errno value from EINPROGRESS to something else to signal
16382248Sraf  * the completion of the asynchronous request.
16392248Sraf  *
16402248Sraf  * The opposite is true for the Solaris interfaces.  These allow for
16412248Sraf  * a change in the return value from AIO_INPROGRESS to something else
16422248Sraf  * to signal the completion of the asynchronous request.
16432248Sraf  */
16442248Sraf void
16452248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
16462248Sraf {
16472248Sraf 	aio_result_t *resultp = reqp->req_resultp;
16482248Sraf 
16492248Sraf 	if (POSIX_AIO(reqp)) {
16502248Sraf 		resultp->aio_return = retval;
16512248Sraf 		membar_producer();
16522248Sraf 		resultp->aio_errno = error;
16532248Sraf 	} else {
16542248Sraf 		resultp->aio_errno = error;
16552248Sraf 		membar_producer();
16562248Sraf 		resultp->aio_return = retval;
16572248Sraf 	}
16582248Sraf }
16592248Sraf 
16602248Sraf /*
16612248Sraf  * Add an AIO request onto the next work queue.
16622248Sraf  * A circular list of workers is used to choose the next worker.
16632248Sraf  */
16642248Sraf void
16652248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
16662248Sraf {
16672248Sraf 	ulwp_t *self = curthread;
16682248Sraf 	aio_worker_t *aiowp;
16692248Sraf 	aio_worker_t *first;
16702248Sraf 	int load_bal_flg = 1;
16712248Sraf 	int found;
16722248Sraf 
16732248Sraf 	ASSERT(reqp->req_state != AIO_REQ_DONEQ);
16742248Sraf 	reqp->req_next = NULL;
16752248Sraf 	/*
16762248Sraf 	 * Try to acquire the next worker's work queue.  If it is locked,
16772248Sraf 	 * then search the list of workers until a queue is found unlocked,
16782248Sraf 	 * or until the list is completely traversed at which point another
16792248Sraf 	 * worker will be created.
16802248Sraf 	 */
16812248Sraf 	sigoff(self);		/* defer SIGIO */
16822248Sraf 	sig_mutex_lock(&__aio_mutex);
16832248Sraf 	first = aiowp = *nextworker;
16842248Sraf 	if (mode != AIONOTIFY)
16852248Sraf 		_aio_outstand_cnt++;
16862248Sraf 	sig_mutex_unlock(&__aio_mutex);
16872248Sraf 
16882248Sraf 	switch (mode) {
16892248Sraf 	case AIOREAD:
16902248Sraf 	case AIOWRITE:
16912248Sraf 	case AIOAREAD:
16922248Sraf 	case AIOAWRITE:
16932248Sraf #if !defined(_LP64)
16942248Sraf 	case AIOAREAD64:
16952248Sraf 	case AIOAWRITE64:
16962248Sraf #endif
16972248Sraf 		/* try to find an idle worker */
16982248Sraf 		found = 0;
16992248Sraf 		do {
17002248Sraf 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
17012248Sraf 				if (aiowp->work_idleflg) {
17022248Sraf 					found = 1;
17032248Sraf 					break;
17042248Sraf 				}
17052248Sraf 				sig_mutex_unlock(&aiowp->work_qlock1);
17062248Sraf 			}
17072248Sraf 		} while ((aiowp = aiowp->work_forw) != first);
17082248Sraf 
17092248Sraf 		if (found) {
17102248Sraf 			aiowp->work_minload1++;
17112248Sraf 			break;
17122248Sraf 		}
17132248Sraf 
17142248Sraf 		/* try to acquire some worker's queue lock */
17152248Sraf 		do {
17162248Sraf 			if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
17172248Sraf 				found = 1;
17182248Sraf 				break;
17192248Sraf 			}
17202248Sraf 		} while ((aiowp = aiowp->work_forw) != first);
17212248Sraf 
17222248Sraf 		/*
17232248Sraf 		 * Create more workers when the workers appear overloaded.
17242248Sraf 		 * Either all the workers are busy draining their queues
17252248Sraf 		 * or no worker's queue lock could be acquired.
17262248Sraf 		 */
17272248Sraf 		if (!found) {
17282248Sraf 			if (_aio_worker_cnt < _max_workers) {
17292248Sraf 				if (_aio_create_worker(reqp, mode))
17302248Sraf 					aio_panic("_aio_req_add: add worker");
17312248Sraf 				sigon(self);	/* reenable SIGIO */
17322248Sraf 				return;
17332248Sraf 			}
17342248Sraf 
17352248Sraf 			/*
17362248Sraf 			 * No worker available and we have created
17372248Sraf 			 * _max_workers, keep going through the
17382248Sraf 			 * list slowly until we get a lock
17392248Sraf 			 */
17402248Sraf 			while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
17412248Sraf 				/*
17422248Sraf 				 * give someone else a chance
17432248Sraf 				 */
17442248Sraf 				_aio_delay(1);
17452248Sraf 				aiowp = aiowp->work_forw;
17462248Sraf 			}
17472248Sraf 		}
17482248Sraf 
17492248Sraf 		ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
17502248Sraf 		if (_aio_worker_cnt < _max_workers &&
17512248Sraf 		    aiowp->work_minload1 >= _minworkload) {
17522248Sraf 			sig_mutex_unlock(&aiowp->work_qlock1);
17532248Sraf 			sig_mutex_lock(&__aio_mutex);
17542248Sraf 			*nextworker = aiowp->work_forw;
17552248Sraf 			sig_mutex_unlock(&__aio_mutex);
17562248Sraf 			if (_aio_create_worker(reqp, mode))
17572248Sraf 				aio_panic("aio_req_add: add worker");
17582248Sraf 			sigon(self);	/* reenable SIGIO */
17592248Sraf 			return;
17602248Sraf 		}
17612248Sraf 		aiowp->work_minload1++;
17622248Sraf 		break;
17632248Sraf 	case AIOFSYNC:
17642248Sraf 	case AIONOTIFY:
17652248Sraf 		load_bal_flg = 0;
17662248Sraf 		sig_mutex_lock(&aiowp->work_qlock1);
17672248Sraf 		break;
17682248Sraf 	default:
17692248Sraf 		aio_panic("_aio_req_add: invalid mode");
17702248Sraf 		break;
17712248Sraf 	}
17722248Sraf 	/*
17732248Sraf 	 * Put request onto worker's work queue.
17742248Sraf 	 */
17752248Sraf 	if (aiowp->work_tail1 == NULL) {
17762248Sraf 		ASSERT(aiowp->work_count1 == 0);
17772248Sraf 		aiowp->work_tail1 = reqp;
17782248Sraf 		aiowp->work_next1 = reqp;
17792248Sraf 	} else {
17802248Sraf 		aiowp->work_head1->req_next = reqp;
17812248Sraf 		if (aiowp->work_next1 == NULL)
17822248Sraf 			aiowp->work_next1 = reqp;
17832248Sraf 	}
17842248Sraf 	reqp->req_state = AIO_REQ_QUEUED;
17852248Sraf 	reqp->req_worker = aiowp;
17862248Sraf 	aiowp->work_head1 = reqp;
17872248Sraf 	/*
17882248Sraf 	 * Awaken worker if it is not currently active.
17892248Sraf 	 */
17902248Sraf 	if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
17912248Sraf 		aiowp->work_idleflg = 0;
17922248Sraf 		(void) cond_signal(&aiowp->work_idle_cv);
17932248Sraf 	}
17942248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
17952248Sraf 
17962248Sraf 	if (load_bal_flg) {
17972248Sraf 		sig_mutex_lock(&__aio_mutex);
17982248Sraf 		*nextworker = aiowp->work_forw;
17992248Sraf 		sig_mutex_unlock(&__aio_mutex);
18002248Sraf 	}
18012248Sraf 	sigon(self);	/* reenable SIGIO */
18022248Sraf }
18032248Sraf 
18042248Sraf /*
18052248Sraf  * Get an AIO request for a specified worker.
18062248Sraf  * If the work queue is empty, return NULL.
18072248Sraf  */
18082248Sraf aio_req_t *
18092248Sraf _aio_req_get(aio_worker_t *aiowp)
18102248Sraf {
18112248Sraf 	aio_req_t *reqp;
18122248Sraf 
18132248Sraf 	sig_mutex_lock(&aiowp->work_qlock1);
18142248Sraf 	if ((reqp = aiowp->work_next1) != NULL) {
18152248Sraf 		/*
18162248Sraf 		 * Remove a POSIX request from the queue; the
18172248Sraf 		 * request queue is a singularly linked list
18182248Sraf 		 * with a previous pointer.  The request is
18192248Sraf 		 * removed by updating the previous pointer.
18202248Sraf 		 *
18212248Sraf 		 * Non-posix requests are left on the queue
18222248Sraf 		 * to eventually be placed on the done queue.
18232248Sraf 		 */
18242248Sraf 
18252248Sraf 		if (POSIX_AIO(reqp)) {
18262248Sraf 			if (aiowp->work_prev1 == NULL) {
18272248Sraf 				aiowp->work_tail1 = reqp->req_next;
18282248Sraf 				if (aiowp->work_tail1 == NULL)
18292248Sraf 					aiowp->work_head1 = NULL;
18302248Sraf 			} else {
18312248Sraf 				aiowp->work_prev1->req_next = reqp->req_next;
18322248Sraf 				if (aiowp->work_head1 == reqp)
18332248Sraf 					aiowp->work_head1 = reqp->req_next;
18342248Sraf 			}
18352248Sraf 
18362248Sraf 		} else {
18372248Sraf 			aiowp->work_prev1 = reqp;
18382248Sraf 			ASSERT(aiowp->work_done1 >= 0);
18392248Sraf 			aiowp->work_done1++;
18402248Sraf 		}
18412248Sraf 		ASSERT(reqp != reqp->req_next);
18422248Sraf 		aiowp->work_next1 = reqp->req_next;
18432248Sraf 		ASSERT(aiowp->work_count1 >= 1);
18442248Sraf 		aiowp->work_count1--;
18452248Sraf 		switch (reqp->req_op) {
18462248Sraf 		case AIOREAD:
18472248Sraf 		case AIOWRITE:
18482248Sraf 		case AIOAREAD:
18492248Sraf 		case AIOAWRITE:
18502248Sraf #if !defined(_LP64)
18512248Sraf 		case AIOAREAD64:
18522248Sraf 		case AIOAWRITE64:
18532248Sraf #endif
18542248Sraf 			ASSERT(aiowp->work_minload1 > 0);
18552248Sraf 			aiowp->work_minload1--;
18562248Sraf 			break;
18572248Sraf 		}
18582248Sraf 		reqp->req_state = AIO_REQ_INPROGRESS;
18592248Sraf 	}
18602248Sraf 	aiowp->work_req = reqp;
18612248Sraf 	ASSERT(reqp != NULL || aiowp->work_count1 == 0);
18622248Sraf 	sig_mutex_unlock(&aiowp->work_qlock1);
18632248Sraf 	return (reqp);
18642248Sraf }
18652248Sraf 
18662248Sraf static void
18672248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
18682248Sraf {
18692248Sraf 	aio_req_t **last;
18702248Sraf 	aio_req_t *lastrp;
18712248Sraf 	aio_req_t *next;
18722248Sraf 
18732248Sraf 	ASSERT(aiowp != NULL);
18742248Sraf 	ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
18752248Sraf 	if (POSIX_AIO(reqp)) {
18762248Sraf 		if (ostate != AIO_REQ_QUEUED)
18772248Sraf 			return;
18782248Sraf 	}
18792248Sraf 	last = &aiowp->work_tail1;
18802248Sraf 	lastrp = aiowp->work_tail1;
18812248Sraf 	ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
18822248Sraf 	while ((next = *last) != NULL) {
18832248Sraf 		if (next == reqp) {
18842248Sraf 			*last = next->req_next;
18852248Sraf 			if (aiowp->work_next1 == next)
18862248Sraf 				aiowp->work_next1 = next->req_next;
18872248Sraf 
18882248Sraf 			if ((next->req_next != NULL) ||
18892248Sraf 			    (aiowp->work_done1 == 0)) {
18902248Sraf 				if (aiowp->work_head1 == next)
18912248Sraf 					aiowp->work_head1 = next->req_next;
18922248Sraf 				if (aiowp->work_prev1 == next)
18932248Sraf 					aiowp->work_prev1 = next->req_next;
18942248Sraf 			} else {
18952248Sraf 				if (aiowp->work_head1 == next)
18962248Sraf 					aiowp->work_head1 = lastrp;
18972248Sraf 				if (aiowp->work_prev1 == next)
18982248Sraf 					aiowp->work_prev1 = lastrp;
18992248Sraf 			}
19002248Sraf 
19012248Sraf 			if (ostate == AIO_REQ_QUEUED) {
19022248Sraf 				ASSERT(aiowp->work_count1 >= 1);
19032248Sraf 				aiowp->work_count1--;
19042248Sraf 				ASSERT(aiowp->work_minload1 >= 1);
19052248Sraf 				aiowp->work_minload1--;
19062248Sraf 			} else {
19072248Sraf 				ASSERT(ostate == AIO_REQ_INPROGRESS &&
19082248Sraf 				    !POSIX_AIO(reqp));
19092248Sraf 				aiowp->work_done1--;
19102248Sraf 			}
19112248Sraf 			return;
19122248Sraf 		}
19132248Sraf 		last = &next->req_next;
19142248Sraf 		lastrp = next;
19152248Sraf 	}
19162248Sraf 	/* NOTREACHED */
19172248Sraf }
19182248Sraf 
19192248Sraf static void
19202248Sraf _aio_enq_doneq(aio_req_t *reqp)
19212248Sraf {
19222248Sraf 	if (_aio_doneq == NULL) {
19232248Sraf 		_aio_doneq = reqp;
19242248Sraf 		reqp->req_next = reqp->req_prev = reqp;
19252248Sraf 	} else {
19262248Sraf 		reqp->req_next = _aio_doneq;
19272248Sraf 		reqp->req_prev = _aio_doneq->req_prev;
19282248Sraf 		_aio_doneq->req_prev->req_next = reqp;
19292248Sraf 		_aio_doneq->req_prev = reqp;
19302248Sraf 	}
19312248Sraf 	reqp->req_state = AIO_REQ_DONEQ;
19322248Sraf 	_aio_doneq_cnt++;
19332248Sraf }
19342248Sraf 
19352248Sraf /*
19362248Sraf  * caller owns the _aio_mutex
19372248Sraf  */
19382248Sraf aio_req_t *
19392248Sraf _aio_req_remove(aio_req_t *reqp)
19402248Sraf {
19412248Sraf 	if (reqp && reqp->req_state != AIO_REQ_DONEQ)
19422248Sraf 		return (NULL);
19432248Sraf 
19442248Sraf 	if (reqp) {
19452248Sraf 		/* request in done queue */
19462248Sraf 		if (_aio_doneq == reqp)
19472248Sraf 			_aio_doneq = reqp->req_next;
19482248Sraf 		if (_aio_doneq == reqp) {
19492248Sraf 			/* only one request on queue */
19502248Sraf 			_aio_doneq = NULL;
19512248Sraf 		} else {
19522248Sraf 			aio_req_t *tmp = reqp->req_next;
19532248Sraf 			reqp->req_prev->req_next = tmp;
19542248Sraf 			tmp->req_prev = reqp->req_prev;
19552248Sraf 		}
19562248Sraf 	} else if ((reqp = _aio_doneq) != NULL) {
19572248Sraf 		if (reqp == reqp->req_next) {
19582248Sraf 			/* only one request on queue */
19592248Sraf 			_aio_doneq = NULL;
19602248Sraf 		} else {
19612248Sraf 			reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
19622248Sraf 			_aio_doneq->req_prev = reqp->req_prev;
19632248Sraf 		}
19642248Sraf 	}
19652248Sraf 	if (reqp) {
19662248Sraf 		_aio_doneq_cnt--;
19672248Sraf 		reqp->req_next = reqp->req_prev = reqp;
19682248Sraf 		reqp->req_state = AIO_REQ_DONE;
19692248Sraf 	}
19702248Sraf 	return (reqp);
19712248Sraf }
19722248Sraf 
19732248Sraf /*
19742248Sraf  * An AIO request is identified by an aio_result_t pointer.  The library
19752248Sraf  * maps this aio_result_t pointer to its internal representation using a
19762248Sraf  * hash table.  This function adds an aio_result_t pointer to the hash table.
19772248Sraf  */
19782248Sraf static int
19792248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
19802248Sraf {
19812248Sraf 	aio_hash_t *hashp;
19822248Sraf 	aio_req_t **prev;
19832248Sraf 	aio_req_t *next;
19842248Sraf 
19852248Sraf 	hashp = _aio_hash + AIOHASH(resultp);
19862248Sraf 	lmutex_lock(&hashp->hash_lock);
19872248Sraf 	prev = &hashp->hash_ptr;
19882248Sraf 	while ((next = *prev) != NULL) {
19892248Sraf 		if (resultp == next->req_resultp) {
19902248Sraf 			lmutex_unlock(&hashp->hash_lock);
19912248Sraf 			return (-1);
19922248Sraf 		}
19932248Sraf 		prev = &next->req_link;
19942248Sraf 	}
19952248Sraf 	*prev = reqp;
19962248Sraf 	ASSERT(reqp->req_link == NULL);
19972248Sraf 	lmutex_unlock(&hashp->hash_lock);
19982248Sraf 	return (0);
19992248Sraf }
20002248Sraf 
20012248Sraf /*
20022248Sraf  * Remove an entry from the hash table.
20032248Sraf  */
20042248Sraf aio_req_t *
20052248Sraf _aio_hash_del(aio_result_t *resultp)
20062248Sraf {
20072248Sraf 	aio_hash_t *hashp;
20082248Sraf 	aio_req_t **prev;
20092248Sraf 	aio_req_t *next = NULL;
20102248Sraf 
20112248Sraf 	if (_aio_hash != NULL) {
20122248Sraf 		hashp = _aio_hash + AIOHASH(resultp);
20132248Sraf 		lmutex_lock(&hashp->hash_lock);
20142248Sraf 		prev = &hashp->hash_ptr;
20152248Sraf 		while ((next = *prev) != NULL) {
20162248Sraf 			if (resultp == next->req_resultp) {
20172248Sraf 				*prev = next->req_link;
20182248Sraf 				next->req_link = NULL;
20192248Sraf 				break;
20202248Sraf 			}
20212248Sraf 			prev = &next->req_link;
20222248Sraf 		}
20232248Sraf 		lmutex_unlock(&hashp->hash_lock);
20242248Sraf 	}
20252248Sraf 	return (next);
20262248Sraf }
20272248Sraf 
20282248Sraf /*
20292248Sraf  *  find an entry in the hash table
20302248Sraf  */
20312248Sraf aio_req_t *
20322248Sraf _aio_hash_find(aio_result_t *resultp)
20332248Sraf {
20342248Sraf 	aio_hash_t *hashp;
20352248Sraf 	aio_req_t **prev;
20362248Sraf 	aio_req_t *next = NULL;
20372248Sraf 
20382248Sraf 	if (_aio_hash != NULL) {
20392248Sraf 		hashp = _aio_hash + AIOHASH(resultp);
20402248Sraf 		lmutex_lock(&hashp->hash_lock);
20412248Sraf 		prev = &hashp->hash_ptr;
20422248Sraf 		while ((next = *prev) != NULL) {
20432248Sraf 			if (resultp == next->req_resultp)
20442248Sraf 				break;
20452248Sraf 			prev = &next->req_link;
20462248Sraf 		}
20472248Sraf 		lmutex_unlock(&hashp->hash_lock);
20482248Sraf 	}
20492248Sraf 	return (next);
20502248Sraf }
20512248Sraf 
20522248Sraf /*
20532248Sraf  * AIO interface for POSIX
20542248Sraf  */
20552248Sraf int
20562248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
20572248Sraf     int mode, int flg)
20582248Sraf {
20592248Sraf 	aio_req_t *reqp;
20602248Sraf 	aio_args_t *ap;
20612248Sraf 	int kerr;
20622248Sraf 
20632248Sraf 	if (aiocbp == NULL) {
20642248Sraf 		errno = EINVAL;
20652248Sraf 		return (-1);
20662248Sraf 	}
20672248Sraf 
20682248Sraf 	/* initialize kaio */
20692248Sraf 	if (!_kaio_ok)
20702248Sraf 		_kaio_init();
20712248Sraf 
20722248Sraf 	aiocbp->aio_state = NOCHECK;
20732248Sraf 
20742248Sraf 	/*
20752248Sraf 	 * If we have been called because a list I/O
20762248Sraf 	 * kaio() failed, we dont want to repeat the
20772248Sraf 	 * system call
20782248Sraf 	 */
20792248Sraf 
20802248Sraf 	if (flg & AIO_KAIO) {
20812248Sraf 		/*
20822248Sraf 		 * Try kernel aio first.
20832248Sraf 		 * If errno is ENOTSUP/EBADFD,
20842248Sraf 		 * fall back to the thread implementation.
20852248Sraf 		 */
20862248Sraf 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
20872248Sraf 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
20882248Sraf 			aiocbp->aio_state = CHECK;
20892248Sraf 			kerr = (int)_kaio(mode, aiocbp);
20902248Sraf 			if (kerr == 0)
20912248Sraf 				return (0);
20922248Sraf 			if (errno != ENOTSUP && errno != EBADFD) {
20932248Sraf 				aiocbp->aio_resultp.aio_errno = errno;
20942248Sraf 				aiocbp->aio_resultp.aio_return = -1;
20952248Sraf 				aiocbp->aio_state = NOCHECK;
20962248Sraf 				return (-1);
20972248Sraf 			}
20982248Sraf 			if (errno == EBADFD)
20992248Sraf 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
21002248Sraf 		}
21012248Sraf 	}
21022248Sraf 
21032248Sraf 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
21042248Sraf 	aiocbp->aio_state = USERAIO;
21052248Sraf 
21062248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
21072248Sraf 		return (-1);
21082248Sraf 
21092248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
21102248Sraf 		errno = EAGAIN;
21112248Sraf 		return (-1);
21122248Sraf 	}
21132248Sraf 
21142248Sraf 	/*
21152248Sraf 	 * If an LIO request, add the list head to the aio request
21162248Sraf 	 */
21172248Sraf 	reqp->req_head = lio_head;
21182248Sraf 	reqp->req_type = AIO_POSIX_REQ;
21192248Sraf 	reqp->req_op = mode;
21202248Sraf 	reqp->req_largefile = 0;
21212248Sraf 
21222248Sraf 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
21232248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
21242248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
21252248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
21262248Sraf 		reqp->req_sigevent.sigev_signo =
21272248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
21282248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
21292248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
21302248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
21312248Sraf 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
21322248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
21332248Sraf 		/*
21342248Sraf 		 * Reuse the sigevent structure to contain the port number
21352248Sraf 		 * and the user value.  Same for SIGEV_THREAD, below.
21362248Sraf 		 */
21372248Sraf 		reqp->req_sigevent.sigev_signo =
21382248Sraf 		    pn->portnfy_port;
21392248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
21402248Sraf 		    pn->portnfy_user;
21412248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
21422248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
21432248Sraf 		/*
21442248Sraf 		 * The sigevent structure contains the port number
21452248Sraf 		 * and the user value.  Same for SIGEV_PORT, above.
21462248Sraf 		 */
21472248Sraf 		reqp->req_sigevent.sigev_signo =
21482248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
21492248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
21502248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
21512248Sraf 	}
21522248Sraf 
21532248Sraf 	reqp->req_resultp = &aiocbp->aio_resultp;
21542248Sraf 	reqp->req_aiocbp = aiocbp;
21552248Sraf 	ap = &reqp->req_args;
21562248Sraf 	ap->fd = aiocbp->aio_fildes;
21572248Sraf 	ap->buf = (caddr_t)aiocbp->aio_buf;
21582248Sraf 	ap->bufsz = aiocbp->aio_nbytes;
21592248Sraf 	ap->offset = aiocbp->aio_offset;
21602248Sraf 
21612248Sraf 	if ((flg & AIO_NO_DUPS) &&
21622248Sraf 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
21632248Sraf 		aio_panic("_aio_rw(): request already in hash table");
21642248Sraf 		_aio_req_free(reqp);
21652248Sraf 		errno = EINVAL;
21662248Sraf 		return (-1);
21672248Sraf 	}
21682248Sraf 	_aio_req_add(reqp, nextworker, mode);
21692248Sraf 	return (0);
21702248Sraf }
21712248Sraf 
21722248Sraf #if !defined(_LP64)
21732248Sraf /*
21742248Sraf  * 64-bit AIO interface for POSIX
21752248Sraf  */
21762248Sraf int
21772248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
21782248Sraf     int mode, int flg)
21792248Sraf {
21802248Sraf 	aio_req_t *reqp;
21812248Sraf 	aio_args_t *ap;
21822248Sraf 	int kerr;
21832248Sraf 
21842248Sraf 	if (aiocbp == NULL) {
21852248Sraf 		errno = EINVAL;
21862248Sraf 		return (-1);
21872248Sraf 	}
21882248Sraf 
21892248Sraf 	/* initialize kaio */
21902248Sraf 	if (!_kaio_ok)
21912248Sraf 		_kaio_init();
21922248Sraf 
21932248Sraf 	aiocbp->aio_state = NOCHECK;
21942248Sraf 
21952248Sraf 	/*
21962248Sraf 	 * If we have been called because a list I/O
21972248Sraf 	 * kaio() failed, we dont want to repeat the
21982248Sraf 	 * system call
21992248Sraf 	 */
22002248Sraf 
22012248Sraf 	if (flg & AIO_KAIO) {
22022248Sraf 		/*
22032248Sraf 		 * Try kernel aio first.
22042248Sraf 		 * If errno is ENOTSUP/EBADFD,
22052248Sraf 		 * fall back to the thread implementation.
22062248Sraf 		 */
22072248Sraf 		if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
22082248Sraf 			aiocbp->aio_resultp.aio_errno = EINPROGRESS;
22092248Sraf 			aiocbp->aio_state = CHECK;
22102248Sraf 			kerr = (int)_kaio(mode, aiocbp);
22112248Sraf 			if (kerr == 0)
22122248Sraf 				return (0);
22132248Sraf 			if (errno != ENOTSUP && errno != EBADFD) {
22142248Sraf 				aiocbp->aio_resultp.aio_errno = errno;
22152248Sraf 				aiocbp->aio_resultp.aio_return = -1;
22162248Sraf 				aiocbp->aio_state = NOCHECK;
22172248Sraf 				return (-1);
22182248Sraf 			}
22192248Sraf 			if (errno == EBADFD)
22202248Sraf 				SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
22212248Sraf 		}
22222248Sraf 	}
22232248Sraf 
22242248Sraf 	aiocbp->aio_resultp.aio_errno = EINPROGRESS;
22252248Sraf 	aiocbp->aio_state = USERAIO;
22262248Sraf 
22272248Sraf 	if (!__uaio_ok && __uaio_init() == -1)
22282248Sraf 		return (-1);
22292248Sraf 
22302248Sraf 	if ((reqp = _aio_req_alloc()) == NULL) {
22312248Sraf 		errno = EAGAIN;
22322248Sraf 		return (-1);
22332248Sraf 	}
22342248Sraf 
22352248Sraf 	/*
22362248Sraf 	 * If an LIO request, add the list head to the aio request
22372248Sraf 	 */
22382248Sraf 	reqp->req_head = lio_head;
22392248Sraf 	reqp->req_type = AIO_POSIX_REQ;
22402248Sraf 	reqp->req_op = mode;
22412248Sraf 	reqp->req_largefile = 1;
22422248Sraf 
22432248Sraf 	if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
22442248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_NONE;
22452248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
22462248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
22472248Sraf 		reqp->req_sigevent.sigev_signo =
22482248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
22492248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
22502248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
22512248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
22522248Sraf 		port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
22532248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_PORT;
22542248Sraf 		reqp->req_sigevent.sigev_signo =
22552248Sraf 		    pn->portnfy_port;
22562248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
22572248Sraf 		    pn->portnfy_user;
22582248Sraf 	} else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
22592248Sraf 		reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
22602248Sraf 		reqp->req_sigevent.sigev_signo =
22612248Sraf 		    aiocbp->aio_sigevent.sigev_signo;
22622248Sraf 		reqp->req_sigevent.sigev_value.sival_ptr =
22632248Sraf 		    aiocbp->aio_sigevent.sigev_value.sival_ptr;
22642248Sraf 	}
22652248Sraf 
22662248Sraf 	reqp->req_resultp = &aiocbp->aio_resultp;
22672248Sraf 	reqp->req_aiocbp = aiocbp;
22682248Sraf 	ap = &reqp->req_args;
22692248Sraf 	ap->fd = aiocbp->aio_fildes;
22702248Sraf 	ap->buf = (caddr_t)aiocbp->aio_buf;
22712248Sraf 	ap->bufsz = aiocbp->aio_nbytes;
22722248Sraf 	ap->offset = aiocbp->aio_offset;
22732248Sraf 
22742248Sraf 	if ((flg & AIO_NO_DUPS) &&
22752248Sraf 	    _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
22762248Sraf 		aio_panic("_aio_rw64(): request already in hash table");
22772248Sraf 		_aio_req_free(reqp);
22782248Sraf 		errno = EINVAL;
22792248Sraf 		return (-1);
22802248Sraf 	}
22812248Sraf 	_aio_req_add(reqp, nextworker, mode);
22822248Sraf 	return (0);
22832248Sraf }
22842248Sraf #endif	/* !defined(_LP64) */
2285