12248Sraf /* 22248Sraf * CDDL HEADER START 32248Sraf * 42248Sraf * The contents of this file are subject to the terms of the 52248Sraf * Common Development and Distribution License (the "License"). 62248Sraf * You may not use this file except in compliance with the License. 72248Sraf * 82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 92248Sraf * or http://www.opensolaris.org/os/licensing. 102248Sraf * See the License for the specific language governing permissions 112248Sraf * and limitations under the License. 122248Sraf * 132248Sraf * When distributing Covered Code, include this CDDL HEADER in each 142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 152248Sraf * If applicable, add the following below this CDDL HEADER, with the 162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 182248Sraf * 192248Sraf * CDDL HEADER END 202248Sraf */ 212248Sraf 222248Sraf /* 234502Spraks * Copyright 2007 Sun Microsystems, Inc. All rights reserved. 242248Sraf * Use is subject to license terms. 252248Sraf */ 262248Sraf 272248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 282248Sraf 292248Sraf #include "synonyms.h" 302248Sraf #include "thr_uberdata.h" 312248Sraf #include "asyncio.h" 322248Sraf #include <atomic.h> 332248Sraf #include <sys/param.h> 342248Sraf #include <sys/file.h> 352248Sraf #include <sys/port.h> 362248Sraf 372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 412248Sraf static void _aio_work_done(aio_worker_t *); 422248Sraf static void _aio_enq_doneq(aio_req_t *); 432248Sraf 442248Sraf extern void _aio_lio_free(aio_lio_t *); 452248Sraf 462248Sraf extern int __fdsync(int, int); 472248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 482248Sraf 492248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 502248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 512248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 522248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 532248Sraf 542248Sraf /* 552248Sraf * switch for kernel async I/O 562248Sraf */ 572248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 582248Sraf 592248Sraf /* 602248Sraf * Key for thread-specific data 612248Sraf */ 622248Sraf pthread_key_t _aio_key; 632248Sraf 642248Sraf /* 652248Sraf * Array for determining whether or not a file supports kaio. 662248Sraf * Initialized in _kaio_init(). 672248Sraf */ 682248Sraf uint32_t *_kaio_supported = NULL; 692248Sraf 702248Sraf /* 712248Sraf * workers for read/write requests 722248Sraf * (__aio_mutex lock protects circular linked list of workers) 732248Sraf */ 742248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 752248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 762248Sraf int __rw_workerscnt; /* number of read/write workers */ 772248Sraf 782248Sraf /* 792248Sraf * worker for notification requests. 802248Sraf */ 812248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 822248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 832248Sraf int __no_workerscnt; /* number of write workers */ 842248Sraf 852248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 862248Sraf aio_req_t *_aio_done_head; 872248Sraf 882248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 892248Sraf cond_t __aio_initcv = DEFAULTCV; 902248Sraf int __aio_initbusy = 0; 912248Sraf 922248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 932248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 942248Sraf 952248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 962248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 972248Sraf 982248Sraf aio_hash_t *_aio_hash; 992248Sraf 1002248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 1012248Sraf 1022248Sraf int _aio_donecnt = 0; 1032248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 1042248Sraf int _aio_doneq_cnt = 0; 1052248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 1062248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 1072248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 1082248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 1092248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 1102248Sraf 1112248Sraf int _max_workers = 256; /* max number of workers permitted */ 1122248Sraf int _min_workers = 4; /* min number of workers */ 1132248Sraf int _minworkload = 2; /* min number of request in q */ 1142248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 1152248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 1162248Sraf sigset_t _worker_set; /* worker's signal mask */ 1172248Sraf 1182248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 1192248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 1202248Sraf 1212248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 1222248Sraf 1232248Sraf int hz; /* clock ticks per second */ 1242248Sraf 1252248Sraf static int 1262248Sraf _kaio_supported_init(void) 1272248Sraf { 1282248Sraf void *ptr; 1292248Sraf size_t size; 1302248Sraf 1312248Sraf if (_kaio_supported != NULL) /* already initialized */ 1322248Sraf return (0); 1332248Sraf 1342248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 1352248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 1362248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1372248Sraf if (ptr == MAP_FAILED) 1382248Sraf return (-1); 1392248Sraf _kaio_supported = ptr; 1402248Sraf return (0); 1412248Sraf } 1422248Sraf 1432248Sraf /* 1442248Sraf * The aio subsystem is initialized when an AIO request is made. 1452248Sraf * Constants are initialized like the max number of workers that 1462248Sraf * the subsystem can create, and the minimum number of workers 1472248Sraf * permitted before imposing some restrictions. Also, some 1482248Sraf * workers are created. 1492248Sraf */ 1502248Sraf int 1512248Sraf __uaio_init(void) 1522248Sraf { 1532248Sraf int ret = -1; 1542248Sraf int i; 1552248Sraf 1562248Sraf lmutex_lock(&__aio_initlock); 1572248Sraf while (__aio_initbusy) 1582248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 1592248Sraf if (__uaio_ok) { /* already initialized */ 1602248Sraf lmutex_unlock(&__aio_initlock); 1612248Sraf return (0); 1622248Sraf } 1632248Sraf __aio_initbusy = 1; 1642248Sraf lmutex_unlock(&__aio_initlock); 1652248Sraf 1662248Sraf hz = (int)sysconf(_SC_CLK_TCK); 1672248Sraf __pid = getpid(); 1682248Sraf 1692248Sraf setup_cancelsig(SIGAIOCANCEL); 1702248Sraf 1712248Sraf if (_kaio_supported_init() != 0) 1722248Sraf goto out; 1732248Sraf 1742248Sraf /* 1752248Sraf * Allocate and initialize the hash table. 1763344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 1772248Sraf */ 1783344Ssp92102 if (_aio_hash == NULL) { 1793344Ssp92102 /* LINTED pointer cast */ 1803344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL, 1813344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 1823344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1833344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) { 1843344Ssp92102 _aio_hash = NULL; 1853344Ssp92102 goto out; 1863344Ssp92102 } 1873344Ssp92102 for (i = 0; i < HASHSZ; i++) 1883344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock, 1893344Ssp92102 USYNC_THREAD, NULL); 1902248Sraf } 1912248Sraf 1922248Sraf /* 1932248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 1942248Sraf */ 1952248Sraf (void) sigfillset(&_worker_set); 1962248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 1972248Sraf 1982248Sraf /* 1993344Ssp92102 * Create one worker to send asynchronous notifications. 2003344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 2013344Ssp92102 */ 2023344Ssp92102 if (__no_workerscnt == 0 && 2033344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 2043344Ssp92102 errno = EAGAIN; 2053344Ssp92102 goto out; 2063344Ssp92102 } 2073344Ssp92102 2083344Ssp92102 /* 2092248Sraf * Create the minimum number of read/write workers. 2103344Ssp92102 * And later check whether atleast one worker is created; 2113344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion. 2122248Sraf */ 2132248Sraf for (i = 0; i < _min_workers; i++) 2142248Sraf (void) _aio_create_worker(NULL, AIOREAD); 2153344Ssp92102 if (__rw_workerscnt == 0) { 2163344Ssp92102 errno = EAGAIN; 2173344Ssp92102 goto out; 2183344Ssp92102 } 2192248Sraf 2202248Sraf ret = 0; 2212248Sraf out: 2222248Sraf lmutex_lock(&__aio_initlock); 2232248Sraf if (ret == 0) 2242248Sraf __uaio_ok = 1; 2252248Sraf __aio_initbusy = 0; 2262248Sraf (void) cond_broadcast(&__aio_initcv); 2272248Sraf lmutex_unlock(&__aio_initlock); 2282248Sraf return (ret); 2292248Sraf } 2302248Sraf 2312248Sraf /* 2322248Sraf * Called from close() before actually performing the real _close(). 2332248Sraf */ 2342248Sraf void 2352248Sraf _aio_close(int fd) 2362248Sraf { 2372248Sraf if (fd < 0) /* avoid cancelling everything */ 2382248Sraf return; 2392248Sraf /* 2402248Sraf * Cancel all outstanding aio requests for this file descriptor. 2412248Sraf */ 2422248Sraf if (__uaio_ok) 2432248Sraf (void) aiocancel_all(fd); 2442248Sraf /* 2452248Sraf * If we have allocated the bit array, clear the bit for this file. 2462248Sraf * The next open may re-use this file descriptor and the new file 2472248Sraf * may have different kaio() behaviour. 2482248Sraf */ 2492248Sraf if (_kaio_supported != NULL) 2502248Sraf CLEAR_KAIO_SUPPORTED(fd); 2512248Sraf } 2522248Sraf 2532248Sraf /* 2542248Sraf * special kaio cleanup thread sits in a loop in the 2552248Sraf * kernel waiting for pending kaio requests to complete. 2562248Sraf */ 2572248Sraf void * 2582248Sraf _kaio_cleanup_thread(void *arg) 2592248Sraf { 2602248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 2612248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 2622248Sraf (void) _kaio(AIOSTART); 2632248Sraf return (arg); 2642248Sraf } 2652248Sraf 2662248Sraf /* 2672248Sraf * initialize kaio. 2682248Sraf */ 2692248Sraf void 2702248Sraf _kaio_init() 2712248Sraf { 2722248Sraf int error; 2732248Sraf sigset_t oset; 2742248Sraf 2752248Sraf lmutex_lock(&__aio_initlock); 2762248Sraf while (__aio_initbusy) 2772248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 2782248Sraf if (_kaio_ok) { /* already initialized */ 2792248Sraf lmutex_unlock(&__aio_initlock); 2802248Sraf return; 2812248Sraf } 2822248Sraf __aio_initbusy = 1; 2832248Sraf lmutex_unlock(&__aio_initlock); 2842248Sraf 2852248Sraf if (_kaio_supported_init() != 0) 2862248Sraf error = ENOMEM; 2872248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 2882248Sraf error = ENOMEM; 2892248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 2902248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 2912248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 2922248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 2932248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 2942248Sraf } 2952248Sraf if (error && _kaiowp != NULL) { 2962248Sraf _aio_worker_free(_kaiowp); 2972248Sraf _kaiowp = NULL; 2982248Sraf } 2992248Sraf 3002248Sraf lmutex_lock(&__aio_initlock); 3012248Sraf if (error) 3022248Sraf _kaio_ok = -1; 3032248Sraf else 3042248Sraf _kaio_ok = 1; 3052248Sraf __aio_initbusy = 0; 3062248Sraf (void) cond_broadcast(&__aio_initcv); 3072248Sraf lmutex_unlock(&__aio_initlock); 3082248Sraf } 3092248Sraf 3102248Sraf int 3112248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3122248Sraf aio_result_t *resultp) 3132248Sraf { 3142248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 3152248Sraf } 3162248Sraf 3172248Sraf int 3182248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3192248Sraf aio_result_t *resultp) 3202248Sraf { 3212248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 3222248Sraf } 3232248Sraf 3242248Sraf #if !defined(_LP64) 3252248Sraf int 3262248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3272248Sraf aio_result_t *resultp) 3282248Sraf { 3292248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 3302248Sraf } 3312248Sraf 3322248Sraf int 3332248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3342248Sraf aio_result_t *resultp) 3352248Sraf { 3362248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 3372248Sraf } 3382248Sraf #endif /* !defined(_LP64) */ 3392248Sraf 3402248Sraf int 3412248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 3422248Sraf aio_result_t *resultp, int mode) 3432248Sraf { 3442248Sraf aio_req_t *reqp; 3452248Sraf aio_args_t *ap; 3462248Sraf offset_t loffset; 347*5535Spraks struct stat64 stat64; 3482248Sraf int error = 0; 3492248Sraf int kerr; 3502248Sraf int umode; 3512248Sraf 3522248Sraf switch (whence) { 3532248Sraf 3542248Sraf case SEEK_SET: 3552248Sraf loffset = offset; 3562248Sraf break; 3572248Sraf case SEEK_CUR: 3582248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 3592248Sraf error = -1; 3602248Sraf else 3612248Sraf loffset += offset; 3622248Sraf break; 3632248Sraf case SEEK_END: 364*5535Spraks if (fstat64(fd, &stat64) == -1) 3652248Sraf error = -1; 3662248Sraf else 367*5535Spraks loffset = offset + stat64.st_size; 3682248Sraf break; 3692248Sraf default: 3702248Sraf errno = EINVAL; 3712248Sraf error = -1; 3722248Sraf } 3732248Sraf 3742248Sraf if (error) 3752248Sraf return (error); 3762248Sraf 3772248Sraf /* initialize kaio */ 3782248Sraf if (!_kaio_ok) 3792248Sraf _kaio_init(); 3802248Sraf 3812248Sraf /* 3822248Sraf * _aio_do_request() needs the original request code (mode) to be able 3832248Sraf * to choose the appropiate 32/64 bit function. All other functions 3842248Sraf * only require the difference between READ and WRITE (umode). 3852248Sraf */ 3862248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 3872248Sraf umode = mode - AIOAREAD64; 3882248Sraf else 3892248Sraf umode = mode; 3902248Sraf 3912248Sraf /* 3922248Sraf * Try kernel aio first. 3932248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 3942248Sraf */ 3952248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 3962248Sraf resultp->aio_errno = 0; 3972248Sraf sig_mutex_lock(&__aio_mutex); 3982248Sraf _kaio_outstand_cnt++; 399*5535Spraks sig_mutex_unlock(&__aio_mutex); 4002248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 4012248Sraf (umode | AIO_POLL_BIT) : umode), 4022248Sraf fd, buf, bufsz, loffset, resultp); 4032248Sraf if (kerr == 0) { 4042248Sraf return (0); 4052248Sraf } 406*5535Spraks sig_mutex_lock(&__aio_mutex); 4072248Sraf _kaio_outstand_cnt--; 4082248Sraf sig_mutex_unlock(&__aio_mutex); 4092248Sraf if (errno != ENOTSUP && errno != EBADFD) 4102248Sraf return (-1); 4112248Sraf if (errno == EBADFD) 4122248Sraf SET_KAIO_NOT_SUPPORTED(fd); 4132248Sraf } 4142248Sraf 4152248Sraf if (!__uaio_ok && __uaio_init() == -1) 4162248Sraf return (-1); 4172248Sraf 4182248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 4192248Sraf errno = EAGAIN; 4202248Sraf return (-1); 4212248Sraf } 4222248Sraf 4232248Sraf /* 4242248Sraf * _aio_do_request() checks reqp->req_op to differentiate 4252248Sraf * between 32 and 64 bit access. 4262248Sraf */ 4272248Sraf reqp->req_op = mode; 4282248Sraf reqp->req_resultp = resultp; 4292248Sraf ap = &reqp->req_args; 4302248Sraf ap->fd = fd; 4312248Sraf ap->buf = buf; 4322248Sraf ap->bufsz = bufsz; 4332248Sraf ap->offset = loffset; 4342248Sraf 4352248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 4362248Sraf _aio_req_free(reqp); 4372248Sraf errno = EINVAL; 4382248Sraf return (-1); 4392248Sraf } 4402248Sraf /* 4412248Sraf * _aio_req_add() only needs the difference between READ and 4422248Sraf * WRITE to choose the right worker queue. 4432248Sraf */ 4442248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 4452248Sraf return (0); 4462248Sraf } 4472248Sraf 4482248Sraf int 4492248Sraf aiocancel(aio_result_t *resultp) 4502248Sraf { 4512248Sraf aio_req_t *reqp; 4522248Sraf aio_worker_t *aiowp; 4532248Sraf int ret; 4542248Sraf int done = 0; 4552248Sraf int canceled = 0; 4562248Sraf 4572248Sraf if (!__uaio_ok) { 4582248Sraf errno = EINVAL; 4592248Sraf return (-1); 4602248Sraf } 4612248Sraf 4622248Sraf sig_mutex_lock(&__aio_mutex); 4632248Sraf reqp = _aio_hash_find(resultp); 4642248Sraf if (reqp == NULL) { 4652248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 4662248Sraf errno = EINVAL; 4672248Sraf else 4682248Sraf errno = EACCES; 4692248Sraf ret = -1; 4702248Sraf } else { 4712248Sraf aiowp = reqp->req_worker; 4722248Sraf sig_mutex_lock(&aiowp->work_qlock1); 4732248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 4742248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 4752248Sraf 4762248Sraf if (canceled) { 4772248Sraf ret = 0; 4782248Sraf } else { 4792248Sraf if (_aio_outstand_cnt == 0 || 4802248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 4812248Sraf errno = EINVAL; 4822248Sraf else 4832248Sraf errno = EACCES; 4842248Sraf ret = -1; 4852248Sraf } 4862248Sraf } 4872248Sraf sig_mutex_unlock(&__aio_mutex); 4882248Sraf return (ret); 4892248Sraf } 4902248Sraf 4912248Sraf /* 4922248Sraf * This must be asynch safe 4932248Sraf */ 4942248Sraf aio_result_t * 4952248Sraf aiowait(struct timeval *uwait) 4962248Sraf { 4972248Sraf aio_result_t *uresultp; 4982248Sraf aio_result_t *kresultp; 4992248Sraf aio_result_t *resultp; 5002248Sraf int dontblock; 5012248Sraf int timedwait = 0; 5022248Sraf int kaio_errno = 0; 5032248Sraf struct timeval twait; 5042248Sraf struct timeval *wait = NULL; 5052248Sraf hrtime_t hrtend; 5062248Sraf hrtime_t hres; 5072248Sraf 5082248Sraf if (uwait) { 5092248Sraf /* 5102248Sraf * Check for a valid specified wait time. 5112248Sraf * If it is invalid, fail the call right away. 5122248Sraf */ 5132248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 5142248Sraf uwait->tv_usec >= MICROSEC) { 5152248Sraf errno = EINVAL; 5162248Sraf return ((aio_result_t *)-1); 5172248Sraf } 5182248Sraf 5192248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 5202248Sraf hrtend = gethrtime() + 5214502Spraks (hrtime_t)uwait->tv_sec * NANOSEC + 5224502Spraks (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 5232248Sraf twait = *uwait; 5242248Sraf wait = &twait; 5252248Sraf timedwait++; 5262248Sraf } else { 5272248Sraf /* polling */ 5282248Sraf sig_mutex_lock(&__aio_mutex); 5292248Sraf if (_kaio_outstand_cnt == 0) { 5302248Sraf kresultp = (aio_result_t *)-1; 5312248Sraf } else { 5322248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5332248Sraf (struct timeval *)-1, 1); 5342248Sraf if (kresultp != (aio_result_t *)-1 && 5352248Sraf kresultp != NULL && 5362248Sraf kresultp != (aio_result_t *)1) { 5372248Sraf _kaio_outstand_cnt--; 5382248Sraf sig_mutex_unlock(&__aio_mutex); 5392248Sraf return (kresultp); 5402248Sraf } 5412248Sraf } 5422248Sraf uresultp = _aio_req_done(); 5432248Sraf sig_mutex_unlock(&__aio_mutex); 5442248Sraf if (uresultp != NULL && 5452248Sraf uresultp != (aio_result_t *)-1) { 5462248Sraf return (uresultp); 5472248Sraf } 5482248Sraf if (uresultp == (aio_result_t *)-1 && 5492248Sraf kresultp == (aio_result_t *)-1) { 5502248Sraf errno = EINVAL; 5512248Sraf return ((aio_result_t *)-1); 5522248Sraf } else { 5532248Sraf return (NULL); 5542248Sraf } 5552248Sraf } 5562248Sraf } 5572248Sraf 5582248Sraf for (;;) { 5592248Sraf sig_mutex_lock(&__aio_mutex); 5602248Sraf uresultp = _aio_req_done(); 5612248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 5622248Sraf sig_mutex_unlock(&__aio_mutex); 5632248Sraf resultp = uresultp; 5642248Sraf break; 5652248Sraf } 5662248Sraf _aiowait_flag++; 5672248Sraf dontblock = (uresultp == (aio_result_t *)-1); 5682248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 5692248Sraf kresultp = (aio_result_t *)-1; 5702248Sraf kaio_errno = EINVAL; 5712248Sraf } else { 5722248Sraf sig_mutex_unlock(&__aio_mutex); 5732248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5742248Sraf wait, dontblock); 5752248Sraf sig_mutex_lock(&__aio_mutex); 5762248Sraf kaio_errno = errno; 5772248Sraf } 5782248Sraf _aiowait_flag--; 5792248Sraf sig_mutex_unlock(&__aio_mutex); 5802248Sraf if (kresultp == (aio_result_t *)1) { 5812248Sraf /* aiowait() awakened by an aionotify() */ 5822248Sraf continue; 5832248Sraf } else if (kresultp != NULL && 5842248Sraf kresultp != (aio_result_t *)-1) { 5852248Sraf resultp = kresultp; 5862248Sraf sig_mutex_lock(&__aio_mutex); 5872248Sraf _kaio_outstand_cnt--; 5882248Sraf sig_mutex_unlock(&__aio_mutex); 5892248Sraf break; 5902248Sraf } else if (kresultp == (aio_result_t *)-1 && 5912248Sraf kaio_errno == EINVAL && 5922248Sraf uresultp == (aio_result_t *)-1) { 5932248Sraf errno = kaio_errno; 5942248Sraf resultp = (aio_result_t *)-1; 5952248Sraf break; 5962248Sraf } else if (kresultp == (aio_result_t *)-1 && 5972248Sraf kaio_errno == EINTR) { 5982248Sraf errno = kaio_errno; 5992248Sraf resultp = (aio_result_t *)-1; 6002248Sraf break; 6012248Sraf } else if (timedwait) { 6022248Sraf hres = hrtend - gethrtime(); 6032248Sraf if (hres <= 0) { 6042248Sraf /* time is up; return */ 6052248Sraf resultp = NULL; 6062248Sraf break; 6072248Sraf } else { 6082248Sraf /* 6092248Sraf * Some time left. Round up the remaining time 6102248Sraf * in nanoseconds to microsec. Retry the call. 6112248Sraf */ 6122248Sraf hres += (NANOSEC / MICROSEC) - 1; 6132248Sraf wait->tv_sec = hres / NANOSEC; 6142248Sraf wait->tv_usec = 6154502Spraks (hres % NANOSEC) / (NANOSEC / MICROSEC); 6162248Sraf } 6172248Sraf } else { 6182248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 6192248Sraf resultp = NULL; 6202248Sraf continue; 6212248Sraf } 6222248Sraf } 6232248Sraf return (resultp); 6242248Sraf } 6252248Sraf 6262248Sraf /* 6272248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 6282248Sraf * into timespec_t *wait. 6292248Sraf */ 6302248Sraf 6312248Sraf int 6322248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 6332248Sraf { 6342248Sraf int ret = 0; 6352248Sraf struct timeval cur; 6362248Sraf timespec_t curtime; 6372248Sraf 6382248Sraf (void) gettimeofday(&cur, NULL); 6392248Sraf curtime.tv_sec = cur.tv_sec; 6402248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 6412248Sraf 6422248Sraf if (end->tv_sec >= curtime.tv_sec) { 6432248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 6442248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 6452248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 6462248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 6472248Sraf ret = -1; /* timer expired */ 6482248Sraf } else { 6492248Sraf if (end->tv_sec > curtime.tv_sec) { 6502248Sraf wait->tv_sec -= 1; 6512248Sraf wait->tv_nsec = NANOSEC - 6522248Sraf (curtime.tv_nsec - end->tv_nsec); 6532248Sraf } else { 6542248Sraf ret = -1; /* timer expired */ 6552248Sraf } 6562248Sraf } 6572248Sraf } else { 6582248Sraf ret = -1; 6592248Sraf } 6602248Sraf return (ret); 6612248Sraf } 6622248Sraf 6632248Sraf /* 6642248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 6652248Sraf * aio`s and return. Those aio's in question will have either noticed the 6662248Sraf * cancellation notice before, during, or after initiating io. 6672248Sraf */ 6682248Sraf int 6692248Sraf aiocancel_all(int fd) 6702248Sraf { 6712248Sraf aio_req_t *reqp; 6722248Sraf aio_req_t **reqpp; 6732248Sraf aio_worker_t *first; 6742248Sraf aio_worker_t *next; 6752248Sraf int canceled = 0; 6762248Sraf int done = 0; 6772248Sraf int cancelall = 0; 6782248Sraf 6792248Sraf sig_mutex_lock(&__aio_mutex); 6802248Sraf 6812248Sraf if (_aio_outstand_cnt == 0) { 6822248Sraf sig_mutex_unlock(&__aio_mutex); 6832248Sraf return (AIO_ALLDONE); 6842248Sraf } 6852248Sraf 6862248Sraf /* 6872248Sraf * Cancel requests from the read/write workers' queues. 6882248Sraf */ 6892248Sraf first = __nextworker_rw; 6902248Sraf next = first; 6912248Sraf do { 6922248Sraf _aio_cancel_work(next, fd, &canceled, &done); 6932248Sraf } while ((next = next->work_forw) != first); 6942248Sraf 6952248Sraf /* 6962248Sraf * finally, check if there are requests on the done queue that 6972248Sraf * should be canceled. 6982248Sraf */ 6992248Sraf if (fd < 0) 7002248Sraf cancelall = 1; 7012248Sraf reqpp = &_aio_done_tail; 7022248Sraf while ((reqp = *reqpp) != NULL) { 7032248Sraf if (cancelall || reqp->req_args.fd == fd) { 7042248Sraf *reqpp = reqp->req_next; 7052248Sraf _aio_donecnt--; 7062248Sraf (void) _aio_hash_del(reqp->req_resultp); 7072248Sraf _aio_req_free(reqp); 7082248Sraf } else 7092248Sraf reqpp = &reqp->req_next; 7102248Sraf } 7112248Sraf if (cancelall) { 7122248Sraf ASSERT(_aio_donecnt == 0); 7132248Sraf _aio_done_head = NULL; 7142248Sraf } 7152248Sraf sig_mutex_unlock(&__aio_mutex); 7162248Sraf 7172248Sraf if (canceled && done == 0) 7182248Sraf return (AIO_CANCELED); 7192248Sraf else if (done && canceled == 0) 7202248Sraf return (AIO_ALLDONE); 7212248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 7222248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 7232248Sraf return (AIO_NOTCANCELED); 7242248Sraf } 7252248Sraf 7262248Sraf /* 7272248Sraf * Cancel requests from a given work queue. If the file descriptor 7282248Sraf * parameter, fd, is non-negative, then only cancel those requests 7292248Sraf * in this queue that are to this file descriptor. If the fd 7302248Sraf * parameter is -1, then cancel all requests. 7312248Sraf */ 7322248Sraf static void 7332248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 7342248Sraf { 7352248Sraf aio_req_t *reqp; 7362248Sraf 7372248Sraf sig_mutex_lock(&aiowp->work_qlock1); 7382248Sraf /* 7392248Sraf * cancel queued requests first. 7402248Sraf */ 7412248Sraf reqp = aiowp->work_tail1; 7422248Sraf while (reqp != NULL) { 7432248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 7442248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 7452248Sraf /* 7462248Sraf * Callers locks were dropped. 7472248Sraf * reqp is invalid; start traversing 7482248Sraf * the list from the beginning again. 7492248Sraf */ 7502248Sraf reqp = aiowp->work_tail1; 7512248Sraf continue; 7522248Sraf } 7532248Sraf } 7542248Sraf reqp = reqp->req_next; 7552248Sraf } 7562248Sraf /* 7572248Sraf * Since the queued requests have been canceled, there can 7582248Sraf * only be one inprogress request that should be canceled. 7592248Sraf */ 7602248Sraf if ((reqp = aiowp->work_req) != NULL && 7612248Sraf (fd < 0 || reqp->req_args.fd == fd)) 7622248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 7632248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 7642248Sraf } 7652248Sraf 7662248Sraf /* 7672248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 7682248Sraf * dropped, otherwise return 0. 7692248Sraf */ 7702248Sraf int 7712248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 7722248Sraf { 7732248Sraf int ostate = reqp->req_state; 7742248Sraf 7752248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 7762248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 7772248Sraf if (ostate == AIO_REQ_CANCELED) 7782248Sraf return (0); 7792248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 7802248Sraf (*done)++; 7812248Sraf return (0); 7822248Sraf } 7832248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 7842248Sraf ASSERT(POSIX_AIO(reqp)); 7852248Sraf /* Cancel the queued aio_fsync() request */ 7862248Sraf if (!reqp->req_head->lio_canned) { 7872248Sraf reqp->req_head->lio_canned = 1; 7882248Sraf _aio_outstand_cnt--; 7892248Sraf (*canceled)++; 7902248Sraf } 7912248Sraf return (0); 7922248Sraf } 7932248Sraf reqp->req_state = AIO_REQ_CANCELED; 7942248Sraf _aio_req_del(aiowp, reqp, ostate); 7952248Sraf (void) _aio_hash_del(reqp->req_resultp); 7962248Sraf (*canceled)++; 7972248Sraf if (reqp == aiowp->work_req) { 7982248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 7992248Sraf /* 8002248Sraf * Set the result values now, before _aiodone() is called. 8012248Sraf * We do this because the application can expect aio_return 8022248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 8032248Sraf * immediately after a successful return from aiocancel() 8042248Sraf * or aio_cancel(). 8052248Sraf */ 8062248Sraf _aio_set_result(reqp, -1, ECANCELED); 8072248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 8082248Sraf return (0); 8092248Sraf } 8102248Sraf if (!POSIX_AIO(reqp)) { 8112248Sraf _aio_outstand_cnt--; 8122248Sraf _aio_set_result(reqp, -1, ECANCELED); 8132248Sraf return (0); 8142248Sraf } 8152248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 8162248Sraf sig_mutex_unlock(&__aio_mutex); 8172248Sraf _aiodone(reqp, -1, ECANCELED); 8182248Sraf sig_mutex_lock(&__aio_mutex); 8192248Sraf sig_mutex_lock(&aiowp->work_qlock1); 8202248Sraf return (1); 8212248Sraf } 8222248Sraf 8232248Sraf int 8242248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 8252248Sraf { 8262248Sraf aio_worker_t *aiowp, **workers, **nextworker; 8272248Sraf int *aio_workerscnt; 8282248Sraf void *(*func)(void *); 8292248Sraf sigset_t oset; 8302248Sraf int error; 8312248Sraf 8322248Sraf /* 8332248Sraf * Put the new worker thread in the right queue. 8342248Sraf */ 8352248Sraf switch (mode) { 8362248Sraf case AIOREAD: 8372248Sraf case AIOWRITE: 8382248Sraf case AIOAREAD: 8392248Sraf case AIOAWRITE: 8402248Sraf #if !defined(_LP64) 8412248Sraf case AIOAREAD64: 8422248Sraf case AIOAWRITE64: 8432248Sraf #endif 8442248Sraf workers = &__workers_rw; 8452248Sraf nextworker = &__nextworker_rw; 8462248Sraf aio_workerscnt = &__rw_workerscnt; 8472248Sraf func = _aio_do_request; 8482248Sraf break; 8492248Sraf case AIONOTIFY: 8502248Sraf workers = &__workers_no; 8512248Sraf nextworker = &__nextworker_no; 8522248Sraf func = _aio_do_notify; 8532248Sraf aio_workerscnt = &__no_workerscnt; 8542248Sraf break; 8552248Sraf default: 8562248Sraf aio_panic("_aio_create_worker: invalid mode"); 8572248Sraf break; 8582248Sraf } 8592248Sraf 8602248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 8612248Sraf return (-1); 8622248Sraf 8632248Sraf if (reqp) { 8642248Sraf reqp->req_state = AIO_REQ_QUEUED; 8652248Sraf reqp->req_worker = aiowp; 8662248Sraf aiowp->work_head1 = reqp; 8672248Sraf aiowp->work_tail1 = reqp; 8682248Sraf aiowp->work_next1 = reqp; 8692248Sraf aiowp->work_count1 = 1; 8702248Sraf aiowp->work_minload1 = 1; 8712248Sraf } 8722248Sraf 8732248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 8742248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 8754502Spraks THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 8762248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 8772248Sraf if (error) { 8782248Sraf if (reqp) { 8792248Sraf reqp->req_state = 0; 8802248Sraf reqp->req_worker = NULL; 8812248Sraf } 8822248Sraf _aio_worker_free(aiowp); 8832248Sraf return (-1); 8842248Sraf } 8852248Sraf 8862248Sraf lmutex_lock(&__aio_mutex); 8872248Sraf (*aio_workerscnt)++; 8882248Sraf if (*workers == NULL) { 8892248Sraf aiowp->work_forw = aiowp; 8902248Sraf aiowp->work_backw = aiowp; 8912248Sraf *nextworker = aiowp; 8922248Sraf *workers = aiowp; 8932248Sraf } else { 8942248Sraf aiowp->work_backw = (*workers)->work_backw; 8952248Sraf aiowp->work_forw = (*workers); 8962248Sraf (*workers)->work_backw->work_forw = aiowp; 8972248Sraf (*workers)->work_backw = aiowp; 8982248Sraf } 8992248Sraf _aio_worker_cnt++; 9002248Sraf lmutex_unlock(&__aio_mutex); 9012248Sraf 9022248Sraf (void) thr_continue(aiowp->work_tid); 9032248Sraf 9042248Sraf return (0); 9052248Sraf } 9062248Sraf 9072248Sraf /* 9082248Sraf * This is the worker's main routine. 9092248Sraf * The task of this function is to execute all queued requests; 9102248Sraf * once the last pending request is executed this function will block 9112248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 9122248Sraf * restart the work. 9132248Sraf * Every worker has an own work queue. The queue lock is required 9142248Sraf * to synchronize the addition of new requests for this worker or 9152248Sraf * cancellation of pending/running requests. 9162248Sraf * 9172248Sraf * Cancellation scenarios: 9182248Sraf * The cancellation of a request is being done asynchronously using 9192248Sraf * _aio_cancel_req() from another thread context. 9202248Sraf * A queued request can be cancelled in different manners : 9212248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 9222248Sraf * - lock the queue -> remove the request -> unlock the queue 9232248Sraf * - this function/thread does not detect this cancellation process 9242248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 9252248Sraf * - this function first allow the cancellation of the running 9262248Sraf * request with the flag "work_cancel_flg=1" 9272248Sraf * see _aio_req_get() -> _aio_cancel_on() 9282248Sraf * During this phase, it is allowed to interrupt the worker 9292248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 9302248Sraf * signal. 9312248Sraf * Once this thread returns from the kernel (because the request 9322248Sraf * is just done), then it must disable a possible cancellation 9332248Sraf * and proceed to finish the request. To disable the cancellation 9342248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 9352248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 9362248Sraf * same procedure as in a) 9372248Sraf * 9382248Sraf * To b) 9392248Sraf * This thread uses sigsetjmp() to define the position in the code, where 9402248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 9412248Sraf * is detected. 9422248Sraf * Normally this thread should get the cancellation signal during the 9432248Sraf * kernel phase (reading or writing). In that case the signal handler 9442248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 9452248Sraf * which again will use the siglongjmp() function to break the standard 9462248Sraf * code flow and jump to the "sigsetjmp" position, provided that 9472248Sraf * "work_cancel_flg" is set to "1". 9482248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 9492248Sraf * thread and it can only run on one CPU at a given time, it is not 9502248Sraf * necessary to protect that flag with the queue lock. 9512248Sraf * Returning from the kernel (read or write system call) we must 9522248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 9532248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 9542248Sraf * - It can happens that this worker thread returns from the kernel and 9552248Sraf * blocks in "work_qlock1", 9562248Sraf * - then a second thread cancels the apparently "in progress" request 9572248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 9582248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 9592248Sraf * from the kernel, 9602248Sraf * - the kernel detects the pending signal and activates the signal 9612248Sraf * handler instead, 9622248Sraf * - if the "work_cancel_flg" is still set then the signal handler 9632248Sraf * should use siglongjmp() to cancel the "in progress" request and 9642248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 9652248Sraf * for a second time => deadlock. 9662248Sraf * To avoid that situation we disable the cancellation of the request 9672248Sraf * in progress BEFORE we try to acquire the work_qlock1. 9682248Sraf * In that case the signal handler will not call siglongjmp() and the 9692248Sraf * worker thread will continue running the standard code flow. 9702248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 9712248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 9722248Sraf * avoiding a deadlock. 9732248Sraf */ 9742248Sraf void * 9752248Sraf _aio_do_request(void *arglist) 9762248Sraf { 9772248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 9782248Sraf ulwp_t *self = curthread; 9792248Sraf struct aio_args *arg; 9802248Sraf aio_req_t *reqp; /* current AIO request */ 9812248Sraf ssize_t retval; 9822248Sraf int error; 9832248Sraf 9842248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 9852248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 9862248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 9872248Sraf ASSERT(aiowp->work_req == NULL); 9882248Sraf 9892248Sraf /* 9902248Sraf * We resume here when an operation is cancelled. 9912248Sraf * On first entry, aiowp->work_req == NULL, so all 9922248Sraf * we do is block SIGAIOCANCEL. 9932248Sraf */ 9942248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 9952248Sraf ASSERT(self->ul_sigdefer == 0); 9962248Sraf 9972248Sraf sigoff(self); /* block SIGAIOCANCEL */ 9982248Sraf if (aiowp->work_req != NULL) 9992248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 10002248Sraf 10012248Sraf for (;;) { 10022248Sraf /* 10032248Sraf * Put completed requests on aio_done_list. This has 10042248Sraf * to be done as part of the main loop to ensure that 10052248Sraf * we don't artificially starve any aiowait'ers. 10062248Sraf */ 10072248Sraf if (aiowp->work_done1) 10082248Sraf _aio_work_done(aiowp); 10092248Sraf 10102248Sraf top: 10112248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 10122248Sraf sigon(self); 10132248Sraf sigoff(self); 10142248Sraf 10152248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 10162248Sraf if (_aio_idle(aiowp) != 0) 10172248Sraf goto top; 10182248Sraf } 10192248Sraf arg = &reqp->req_args; 10202248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 10212248Sraf reqp->req_state == AIO_REQ_CANCELED); 10222248Sraf error = 0; 10232248Sraf 10242248Sraf switch (reqp->req_op) { 10252248Sraf case AIOREAD: 10262248Sraf case AIOAREAD: 10272248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10282248Sraf retval = pread(arg->fd, arg->buf, 10292248Sraf arg->bufsz, arg->offset); 10302248Sraf if (retval == -1) { 10312248Sraf if (errno == ESPIPE) { 10322248Sraf retval = read(arg->fd, 10332248Sraf arg->buf, arg->bufsz); 10342248Sraf if (retval == -1) 10352248Sraf error = errno; 10362248Sraf } else { 10372248Sraf error = errno; 10382248Sraf } 10392248Sraf } 10402248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10412248Sraf break; 10422248Sraf case AIOWRITE: 10432248Sraf case AIOAWRITE: 10442248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10452248Sraf retval = pwrite(arg->fd, arg->buf, 10462248Sraf arg->bufsz, arg->offset); 10472248Sraf if (retval == -1) { 10482248Sraf if (errno == ESPIPE) { 10492248Sraf retval = write(arg->fd, 10502248Sraf arg->buf, arg->bufsz); 10512248Sraf if (retval == -1) 10522248Sraf error = errno; 10532248Sraf } else { 10542248Sraf error = errno; 10552248Sraf } 10562248Sraf } 10572248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10582248Sraf break; 10592248Sraf #if !defined(_LP64) 10602248Sraf case AIOAREAD64: 10612248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10622248Sraf retval = pread64(arg->fd, arg->buf, 10632248Sraf arg->bufsz, arg->offset); 10642248Sraf if (retval == -1) { 10652248Sraf if (errno == ESPIPE) { 10662248Sraf retval = read(arg->fd, 10672248Sraf arg->buf, arg->bufsz); 10682248Sraf if (retval == -1) 10692248Sraf error = errno; 10702248Sraf } else { 10712248Sraf error = errno; 10722248Sraf } 10732248Sraf } 10742248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10752248Sraf break; 10762248Sraf case AIOAWRITE64: 10772248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10782248Sraf retval = pwrite64(arg->fd, arg->buf, 10792248Sraf arg->bufsz, arg->offset); 10802248Sraf if (retval == -1) { 10812248Sraf if (errno == ESPIPE) { 10822248Sraf retval = write(arg->fd, 10832248Sraf arg->buf, arg->bufsz); 10842248Sraf if (retval == -1) 10852248Sraf error = errno; 10862248Sraf } else { 10872248Sraf error = errno; 10882248Sraf } 10892248Sraf } 10902248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10912248Sraf break; 10922248Sraf #endif /* !defined(_LP64) */ 10932248Sraf case AIOFSYNC: 10942248Sraf if (_aio_fsync_del(aiowp, reqp)) 10952248Sraf goto top; 10962248Sraf ASSERT(reqp->req_head == NULL); 10972248Sraf /* 10982248Sraf * All writes for this fsync request are now 10992248Sraf * acknowledged. Now make these writes visible 11002248Sraf * and put the final request into the hash table. 11012248Sraf */ 11022248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11032248Sraf /* EMPTY */; 11042248Sraf } else if (arg->offset == O_SYNC) { 11052248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 11062248Sraf error = errno; 11072248Sraf } else { 11082248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 11092248Sraf error = errno; 11102248Sraf } 11112248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 11122248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 11132248Sraf "request already in hash table"); 11142248Sraf break; 11152248Sraf default: 11162248Sraf aio_panic("_aio_do_request, bad op"); 11172248Sraf } 11182248Sraf 11192248Sraf _aio_finish_request(aiowp, retval, error); 11202248Sraf } 11212248Sraf /* NOTREACHED */ 11222248Sraf return (NULL); 11232248Sraf } 11242248Sraf 11252248Sraf /* 11262248Sraf * Perform the tail processing for _aio_do_request(). 11272248Sraf * The in-progress request may or may not have been cancelled. 11282248Sraf */ 11292248Sraf static void 11302248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 11312248Sraf { 11322248Sraf aio_req_t *reqp; 11332248Sraf 11342248Sraf sig_mutex_lock(&aiowp->work_qlock1); 11352248Sraf if ((reqp = aiowp->work_req) == NULL) 11362248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11372248Sraf else { 11382248Sraf aiowp->work_req = NULL; 11392248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11402248Sraf retval = -1; 11412248Sraf error = ECANCELED; 11422248Sraf } 11432248Sraf if (!POSIX_AIO(reqp)) { 11444502Spraks int notify; 11452248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11462248Sraf sig_mutex_lock(&__aio_mutex); 11472248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11482248Sraf reqp->req_state = AIO_REQ_DONE; 11494502Spraks /* 11504502Spraks * If it was canceled, this request will not be 11514502Spraks * added to done list. Just free it. 11524502Spraks */ 11534502Spraks if (error == ECANCELED) { 11542248Sraf _aio_outstand_cnt--; 11554502Spraks _aio_req_free(reqp); 11564502Spraks } else { 11574502Spraks _aio_set_result(reqp, retval, error); 11584502Spraks _aio_req_done_cnt++; 11594502Spraks } 11604502Spraks /* 11614502Spraks * Notify any thread that may have blocked 11624502Spraks * because it saw an outstanding request. 11634502Spraks */ 11644502Spraks notify = 0; 11654502Spraks if (_aio_outstand_cnt == 0 && _aiowait_flag) { 11664502Spraks notify = 1; 11674502Spraks } 11682248Sraf sig_mutex_unlock(&__aio_mutex); 11694502Spraks if (notify) { 11704502Spraks (void) _kaio(AIONOTIFY); 11714502Spraks } 11722248Sraf } else { 11732248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11742248Sraf reqp->req_state = AIO_REQ_DONE; 11752248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11762248Sraf _aiodone(reqp, retval, error); 11772248Sraf } 11782248Sraf } 11792248Sraf } 11802248Sraf 11812248Sraf void 11822248Sraf _aio_req_mark_done(aio_req_t *reqp) 11832248Sraf { 11842248Sraf #if !defined(_LP64) 11852248Sraf if (reqp->req_largefile) 11862248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11872248Sraf else 11882248Sraf #endif 11892248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11902248Sraf } 11912248Sraf 11922248Sraf /* 11932248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 11942248Sraf * hopefully to consume one of our queued signals. 11952248Sraf */ 11962248Sraf static void 11972248Sraf _aio_delay(int ticks) 11982248Sraf { 11992248Sraf (void) usleep(ticks * (MICROSEC / hz)); 12002248Sraf } 12012248Sraf 12022248Sraf /* 12032248Sraf * Actually send the notifications. 12042248Sraf * We could block indefinitely here if the application 12052248Sraf * is not listening for the signal or port notifications. 12062248Sraf */ 12072248Sraf static void 12082248Sraf send_notification(notif_param_t *npp) 12092248Sraf { 12102248Sraf extern int __sigqueue(pid_t pid, int signo, 12114502Spraks /* const union sigval */ void *value, int si_code, int block); 12122248Sraf 12132248Sraf if (npp->np_signo) 12142248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 12152248Sraf SI_ASYNCIO, 1); 12162248Sraf else if (npp->np_port >= 0) 12172248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 12182248Sraf npp->np_event, npp->np_object, npp->np_user); 12192248Sraf 12202248Sraf if (npp->np_lio_signo) 12212248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 12222248Sraf SI_ASYNCIO, 1); 12232248Sraf else if (npp->np_lio_port >= 0) 12242248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 12252248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 12262248Sraf } 12272248Sraf 12282248Sraf /* 12292248Sraf * Asynchronous notification worker. 12302248Sraf */ 12312248Sraf void * 12322248Sraf _aio_do_notify(void *arg) 12332248Sraf { 12342248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 12352248Sraf aio_req_t *reqp; 12362248Sraf 12372248Sraf /* 12382248Sraf * This isn't really necessary. All signals are blocked. 12392248Sraf */ 12402248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 12412248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 12422248Sraf 12432248Sraf /* 12442248Sraf * Notifications are never cancelled. 12452248Sraf * All signals remain blocked, forever. 12462248Sraf */ 12472248Sraf for (;;) { 12482248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 12492248Sraf if (_aio_idle(aiowp) != 0) 12502248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 12512248Sraf } 12522248Sraf send_notification(&reqp->req_notify); 12532248Sraf _aio_req_free(reqp); 12542248Sraf } 12552248Sraf 12562248Sraf /* NOTREACHED */ 12572248Sraf return (NULL); 12582248Sraf } 12592248Sraf 12602248Sraf /* 12612248Sraf * Do the completion semantics for a request that was either canceled 12622248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 12632248Sraf */ 12642248Sraf static void 12652248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 12662248Sraf { 12672248Sraf aio_result_t *resultp = reqp->req_resultp; 12682248Sraf int notify = 0; 12692248Sraf aio_lio_t *head; 12702248Sraf int sigev_none; 12712248Sraf int sigev_signal; 12722248Sraf int sigev_thread; 12732248Sraf int sigev_port; 12742248Sraf notif_param_t np; 12752248Sraf 12762248Sraf /* 12772248Sraf * We call _aiodone() only for Posix I/O. 12782248Sraf */ 12792248Sraf ASSERT(POSIX_AIO(reqp)); 12802248Sraf 12812248Sraf sigev_none = 0; 12822248Sraf sigev_signal = 0; 12832248Sraf sigev_thread = 0; 12842248Sraf sigev_port = 0; 12852248Sraf np.np_signo = 0; 12862248Sraf np.np_port = -1; 12872248Sraf np.np_lio_signo = 0; 12882248Sraf np.np_lio_port = -1; 12892248Sraf 12902248Sraf switch (reqp->req_sigevent.sigev_notify) { 12912248Sraf case SIGEV_NONE: 12922248Sraf sigev_none = 1; 12932248Sraf break; 12942248Sraf case SIGEV_SIGNAL: 12952248Sraf sigev_signal = 1; 12962248Sraf break; 12972248Sraf case SIGEV_THREAD: 12982248Sraf sigev_thread = 1; 12992248Sraf break; 13002248Sraf case SIGEV_PORT: 13012248Sraf sigev_port = 1; 13022248Sraf break; 13032248Sraf default: 13042248Sraf aio_panic("_aiodone: improper sigev_notify"); 13052248Sraf break; 13062248Sraf } 13072248Sraf 13082248Sraf /* 13092248Sraf * Figure out the notification parameters while holding __aio_mutex. 13102248Sraf * Actually perform the notifications after dropping __aio_mutex. 13112248Sraf * This allows us to sleep for a long time (if the notifications 13122248Sraf * incur delays) without impeding other async I/O operations. 13132248Sraf */ 13142248Sraf 13152248Sraf sig_mutex_lock(&__aio_mutex); 13162248Sraf 13172248Sraf if (sigev_signal) { 13182248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 13192248Sraf notify = 1; 13202248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13212248Sraf } else if (sigev_thread | sigev_port) { 13222248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 13232248Sraf notify = 1; 13242248Sraf np.np_event = reqp->req_op; 13252248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 13262248Sraf np.np_event = AIOFSYNC64; 13272248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 13282248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13292248Sraf } 13302248Sraf 13312248Sraf if (resultp->aio_errno == EINPROGRESS) 13322248Sraf _aio_set_result(reqp, retval, error); 13332248Sraf 13342248Sraf _aio_outstand_cnt--; 13352248Sraf 13362248Sraf head = reqp->req_head; 13372248Sraf reqp->req_head = NULL; 13382248Sraf 13392248Sraf if (sigev_none) { 13402248Sraf _aio_enq_doneq(reqp); 13412248Sraf reqp = NULL; 13422248Sraf } else { 13432248Sraf (void) _aio_hash_del(resultp); 13442248Sraf _aio_req_mark_done(reqp); 13452248Sraf } 13462248Sraf 13472248Sraf _aio_waitn_wakeup(); 13482248Sraf 13492248Sraf /* 13502248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 13512248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 13522248Sraf * when they are waiting in the kernel for completed I/Os. 13532248Sraf * 13542248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 13552248Sraf * in the kernel; then the corresponding __aio_waitn() or 13562248Sraf * __aio_suspend() function could reap the recently 13572248Sraf * completed I/Os (_aiodone()). 13582248Sraf */ 13592248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 13602248Sraf (void) _kaio(AIONOTIFY); 13612248Sraf 13622248Sraf sig_mutex_unlock(&__aio_mutex); 13632248Sraf 13642248Sraf if (head != NULL) { 13652248Sraf /* 13662248Sraf * If all the lio requests have completed, 13672248Sraf * prepare to notify the waiting thread. 13682248Sraf */ 13692248Sraf sig_mutex_lock(&head->lio_mutex); 13702248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 13712248Sraf if (head->lio_refcnt == 1) { 13722248Sraf int waiting = 0; 13732248Sraf if (head->lio_mode == LIO_WAIT) { 13742248Sraf if ((waiting = head->lio_waiting) != 0) 13752248Sraf (void) cond_signal(&head->lio_cond_cv); 13762248Sraf } else if (head->lio_port < 0) { /* none or signal */ 13772248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 13782248Sraf notify = 1; 13792248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13802248Sraf } else { /* thread or port */ 13812248Sraf notify = 1; 13822248Sraf np.np_lio_port = head->lio_port; 13832248Sraf np.np_lio_event = head->lio_event; 13842248Sraf np.np_lio_object = 13852248Sraf (uintptr_t)head->lio_sigevent; 13862248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13872248Sraf } 13882248Sraf head->lio_nent = head->lio_refcnt = 0; 13892248Sraf sig_mutex_unlock(&head->lio_mutex); 13902248Sraf if (waiting == 0) 13912248Sraf _aio_lio_free(head); 13922248Sraf } else { 13932248Sraf head->lio_nent--; 13942248Sraf head->lio_refcnt--; 13952248Sraf sig_mutex_unlock(&head->lio_mutex); 13962248Sraf } 13972248Sraf } 13982248Sraf 13992248Sraf /* 14002248Sraf * The request is completed; now perform the notifications. 14012248Sraf */ 14022248Sraf if (notify) { 14032248Sraf if (reqp != NULL) { 14042248Sraf /* 14052248Sraf * We usually put the request on the notification 14062248Sraf * queue because we don't want to block and delay 14072248Sraf * other operations behind us in the work queue. 14082248Sraf * Also we must never block on a cancel notification 14092248Sraf * because we are being called from an application 14102248Sraf * thread in this case and that could lead to deadlock 14112248Sraf * if no other thread is receiving notificatins. 14122248Sraf */ 14132248Sraf reqp->req_notify = np; 14142248Sraf reqp->req_op = AIONOTIFY; 14152248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 14162248Sraf reqp = NULL; 14172248Sraf } else { 14182248Sraf /* 14192248Sraf * We already put the request on the done queue, 14202248Sraf * so we can't queue it to the notification queue. 14212248Sraf * Just do the notification directly. 14222248Sraf */ 14232248Sraf send_notification(&np); 14242248Sraf } 14252248Sraf } 14262248Sraf 14272248Sraf if (reqp != NULL) 14282248Sraf _aio_req_free(reqp); 14292248Sraf } 14302248Sraf 14312248Sraf /* 14322248Sraf * Delete fsync requests from list head until there is 14332248Sraf * only one left. Return 0 when there is only one, 14342248Sraf * otherwise return a non-zero value. 14352248Sraf */ 14362248Sraf static int 14372248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 14382248Sraf { 14392248Sraf aio_lio_t *head = reqp->req_head; 14402248Sraf int rval = 0; 14412248Sraf 14422248Sraf ASSERT(reqp == aiowp->work_req); 14432248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14442248Sraf sig_mutex_lock(&head->lio_mutex); 14452248Sraf if (head->lio_refcnt > 1) { 14462248Sraf head->lio_refcnt--; 14472248Sraf head->lio_nent--; 14482248Sraf aiowp->work_req = NULL; 14492248Sraf sig_mutex_unlock(&head->lio_mutex); 14502248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14512248Sraf sig_mutex_lock(&__aio_mutex); 14522248Sraf _aio_outstand_cnt--; 14532248Sraf _aio_waitn_wakeup(); 14542248Sraf sig_mutex_unlock(&__aio_mutex); 14552248Sraf _aio_req_free(reqp); 14562248Sraf return (1); 14572248Sraf } 14582248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 14592248Sraf reqp->req_head = NULL; 14602248Sraf if (head->lio_canned) 14612248Sraf reqp->req_state = AIO_REQ_CANCELED; 14622248Sraf if (head->lio_mode == LIO_DESTROY) { 14632248Sraf aiowp->work_req = NULL; 14642248Sraf rval = 1; 14652248Sraf } 14662248Sraf sig_mutex_unlock(&head->lio_mutex); 14672248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14682248Sraf head->lio_refcnt--; 14692248Sraf head->lio_nent--; 14702248Sraf _aio_lio_free(head); 14712248Sraf if (rval != 0) 14722248Sraf _aio_req_free(reqp); 14732248Sraf return (rval); 14742248Sraf } 14752248Sraf 14762248Sraf /* 14772248Sraf * A worker is set idle when its work queue is empty. 14782248Sraf * The worker checks again that it has no more work 14792248Sraf * and then goes to sleep waiting for more work. 14802248Sraf */ 14812248Sraf int 14822248Sraf _aio_idle(aio_worker_t *aiowp) 14832248Sraf { 14842248Sraf int error = 0; 14852248Sraf 14862248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14872248Sraf if (aiowp->work_count1 == 0) { 14882248Sraf ASSERT(aiowp->work_minload1 == 0); 14892248Sraf aiowp->work_idleflg = 1; 14902248Sraf /* 14912248Sraf * A cancellation handler is not needed here. 14922248Sraf * aio worker threads are never cancelled via pthread_cancel(). 14932248Sraf */ 14942248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 14952248Sraf &aiowp->work_qlock1); 14962248Sraf /* 14972248Sraf * The idle flag is normally cleared before worker is awakened 14982248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 14992248Sraf */ 15002248Sraf if (error) 15012248Sraf aiowp->work_idleflg = 0; 15022248Sraf } 15032248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15042248Sraf return (error); 15052248Sraf } 15062248Sraf 15072248Sraf /* 15082248Sraf * A worker's completed AIO requests are placed onto a global 15092248Sraf * done queue. The application is only sent a SIGIO signal if 15102248Sraf * the process has a handler enabled and it is not waiting via 15112248Sraf * aiowait(). 15122248Sraf */ 15132248Sraf static void 15142248Sraf _aio_work_done(aio_worker_t *aiowp) 15152248Sraf { 15162248Sraf aio_req_t *reqp; 15172248Sraf 15182248Sraf sig_mutex_lock(&aiowp->work_qlock1); 15192248Sraf reqp = aiowp->work_prev1; 15202248Sraf reqp->req_next = NULL; 15212248Sraf aiowp->work_done1 = 0; 15222248Sraf aiowp->work_tail1 = aiowp->work_next1; 15232248Sraf if (aiowp->work_tail1 == NULL) 15242248Sraf aiowp->work_head1 = NULL; 15252248Sraf aiowp->work_prev1 = NULL; 15262248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15272248Sraf sig_mutex_lock(&__aio_mutex); 15282248Sraf _aio_donecnt++; 15292248Sraf _aio_outstand_cnt--; 15302248Sraf _aio_req_done_cnt--; 15312248Sraf ASSERT(_aio_donecnt > 0 && 15322248Sraf _aio_outstand_cnt >= 0 && 15332248Sraf _aio_req_done_cnt >= 0); 15342248Sraf ASSERT(reqp != NULL); 15352248Sraf 15362248Sraf if (_aio_done_tail == NULL) { 15372248Sraf _aio_done_head = _aio_done_tail = reqp; 15382248Sraf } else { 15392248Sraf _aio_done_head->req_next = reqp; 15402248Sraf _aio_done_head = reqp; 15412248Sraf } 15422248Sraf 15432248Sraf if (_aiowait_flag) { 15442248Sraf sig_mutex_unlock(&__aio_mutex); 15452248Sraf (void) _kaio(AIONOTIFY); 15462248Sraf } else { 15472248Sraf sig_mutex_unlock(&__aio_mutex); 15482248Sraf if (_sigio_enabled) 15492248Sraf (void) kill(__pid, SIGIO); 15502248Sraf } 15512248Sraf } 15522248Sraf 15532248Sraf /* 15542248Sraf * The done queue consists of AIO requests that are in either the 15552248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 15562248Sraf * are discarded. If the done queue is empty then NULL is returned. 15572248Sraf * Otherwise the address of a done aio_result_t is returned. 15582248Sraf */ 15592248Sraf aio_result_t * 15602248Sraf _aio_req_done(void) 15612248Sraf { 15622248Sraf aio_req_t *reqp; 15632248Sraf aio_result_t *resultp; 15642248Sraf 15652248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 15662248Sraf 15672248Sraf if ((reqp = _aio_done_tail) != NULL) { 15682248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 15692248Sraf _aio_done_head = NULL; 15702248Sraf ASSERT(_aio_donecnt > 0); 15712248Sraf _aio_donecnt--; 15722248Sraf (void) _aio_hash_del(reqp->req_resultp); 15732248Sraf resultp = reqp->req_resultp; 15742248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 15752248Sraf _aio_req_free(reqp); 15762248Sraf return (resultp); 15772248Sraf } 15782248Sraf /* is queue empty? */ 15792248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 15802248Sraf return ((aio_result_t *)-1); 15812248Sraf } 15822248Sraf return (NULL); 15832248Sraf } 15842248Sraf 15852248Sraf /* 15862248Sraf * Set the return and errno values for the application's use. 15872248Sraf * 15882248Sraf * For the Posix interfaces, we must set the return value first followed 15892248Sraf * by the errno value because the Posix interfaces allow for a change 15902248Sraf * in the errno value from EINPROGRESS to something else to signal 15912248Sraf * the completion of the asynchronous request. 15922248Sraf * 15932248Sraf * The opposite is true for the Solaris interfaces. These allow for 15942248Sraf * a change in the return value from AIO_INPROGRESS to something else 15952248Sraf * to signal the completion of the asynchronous request. 15962248Sraf */ 15972248Sraf void 15982248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 15992248Sraf { 16002248Sraf aio_result_t *resultp = reqp->req_resultp; 16012248Sraf 16022248Sraf if (POSIX_AIO(reqp)) { 16032248Sraf resultp->aio_return = retval; 16042248Sraf membar_producer(); 16052248Sraf resultp->aio_errno = error; 16062248Sraf } else { 16072248Sraf resultp->aio_errno = error; 16082248Sraf membar_producer(); 16092248Sraf resultp->aio_return = retval; 16102248Sraf } 16112248Sraf } 16122248Sraf 16132248Sraf /* 16142248Sraf * Add an AIO request onto the next work queue. 16152248Sraf * A circular list of workers is used to choose the next worker. 16162248Sraf */ 16172248Sraf void 16182248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 16192248Sraf { 16202248Sraf ulwp_t *self = curthread; 16212248Sraf aio_worker_t *aiowp; 16222248Sraf aio_worker_t *first; 16232248Sraf int load_bal_flg = 1; 16242248Sraf int found; 16252248Sraf 16262248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 16272248Sraf reqp->req_next = NULL; 16282248Sraf /* 16292248Sraf * Try to acquire the next worker's work queue. If it is locked, 16302248Sraf * then search the list of workers until a queue is found unlocked, 16312248Sraf * or until the list is completely traversed at which point another 16322248Sraf * worker will be created. 16332248Sraf */ 16342248Sraf sigoff(self); /* defer SIGIO */ 16352248Sraf sig_mutex_lock(&__aio_mutex); 16362248Sraf first = aiowp = *nextworker; 16372248Sraf if (mode != AIONOTIFY) 16382248Sraf _aio_outstand_cnt++; 16392248Sraf sig_mutex_unlock(&__aio_mutex); 16402248Sraf 16412248Sraf switch (mode) { 16422248Sraf case AIOREAD: 16432248Sraf case AIOWRITE: 16442248Sraf case AIOAREAD: 16452248Sraf case AIOAWRITE: 16462248Sraf #if !defined(_LP64) 16472248Sraf case AIOAREAD64: 16482248Sraf case AIOAWRITE64: 16492248Sraf #endif 16502248Sraf /* try to find an idle worker */ 16512248Sraf found = 0; 16522248Sraf do { 16532248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16542248Sraf if (aiowp->work_idleflg) { 16552248Sraf found = 1; 16562248Sraf break; 16572248Sraf } 16582248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 16592248Sraf } 16602248Sraf } while ((aiowp = aiowp->work_forw) != first); 16612248Sraf 16622248Sraf if (found) { 16632248Sraf aiowp->work_minload1++; 16642248Sraf break; 16652248Sraf } 16662248Sraf 16672248Sraf /* try to acquire some worker's queue lock */ 16682248Sraf do { 16692248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16702248Sraf found = 1; 16712248Sraf break; 16722248Sraf } 16732248Sraf } while ((aiowp = aiowp->work_forw) != first); 16742248Sraf 16752248Sraf /* 16762248Sraf * Create more workers when the workers appear overloaded. 16772248Sraf * Either all the workers are busy draining their queues 16782248Sraf * or no worker's queue lock could be acquired. 16792248Sraf */ 16802248Sraf if (!found) { 16812248Sraf if (_aio_worker_cnt < _max_workers) { 16822248Sraf if (_aio_create_worker(reqp, mode)) 16832248Sraf aio_panic("_aio_req_add: add worker"); 16842248Sraf sigon(self); /* reenable SIGIO */ 16852248Sraf return; 16862248Sraf } 16872248Sraf 16882248Sraf /* 16892248Sraf * No worker available and we have created 16902248Sraf * _max_workers, keep going through the 16912248Sraf * list slowly until we get a lock 16922248Sraf */ 16932248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 16942248Sraf /* 16952248Sraf * give someone else a chance 16962248Sraf */ 16972248Sraf _aio_delay(1); 16982248Sraf aiowp = aiowp->work_forw; 16992248Sraf } 17002248Sraf } 17012248Sraf 17022248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 17032248Sraf if (_aio_worker_cnt < _max_workers && 17042248Sraf aiowp->work_minload1 >= _minworkload) { 17052248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17062248Sraf sig_mutex_lock(&__aio_mutex); 17072248Sraf *nextworker = aiowp->work_forw; 17082248Sraf sig_mutex_unlock(&__aio_mutex); 17092248Sraf if (_aio_create_worker(reqp, mode)) 17102248Sraf aio_panic("aio_req_add: add worker"); 17112248Sraf sigon(self); /* reenable SIGIO */ 17122248Sraf return; 17132248Sraf } 17142248Sraf aiowp->work_minload1++; 17152248Sraf break; 17162248Sraf case AIOFSYNC: 17172248Sraf case AIONOTIFY: 17182248Sraf load_bal_flg = 0; 17192248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17202248Sraf break; 17212248Sraf default: 17222248Sraf aio_panic("_aio_req_add: invalid mode"); 17232248Sraf break; 17242248Sraf } 17252248Sraf /* 17262248Sraf * Put request onto worker's work queue. 17272248Sraf */ 17282248Sraf if (aiowp->work_tail1 == NULL) { 17292248Sraf ASSERT(aiowp->work_count1 == 0); 17302248Sraf aiowp->work_tail1 = reqp; 17312248Sraf aiowp->work_next1 = reqp; 17322248Sraf } else { 17332248Sraf aiowp->work_head1->req_next = reqp; 17342248Sraf if (aiowp->work_next1 == NULL) 17352248Sraf aiowp->work_next1 = reqp; 17362248Sraf } 17372248Sraf reqp->req_state = AIO_REQ_QUEUED; 17382248Sraf reqp->req_worker = aiowp; 17392248Sraf aiowp->work_head1 = reqp; 17402248Sraf /* 17412248Sraf * Awaken worker if it is not currently active. 17422248Sraf */ 17432248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 17442248Sraf aiowp->work_idleflg = 0; 17452248Sraf (void) cond_signal(&aiowp->work_idle_cv); 17462248Sraf } 17472248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17482248Sraf 17492248Sraf if (load_bal_flg) { 17502248Sraf sig_mutex_lock(&__aio_mutex); 17512248Sraf *nextworker = aiowp->work_forw; 17522248Sraf sig_mutex_unlock(&__aio_mutex); 17532248Sraf } 17542248Sraf sigon(self); /* reenable SIGIO */ 17552248Sraf } 17562248Sraf 17572248Sraf /* 17582248Sraf * Get an AIO request for a specified worker. 17592248Sraf * If the work queue is empty, return NULL. 17602248Sraf */ 17612248Sraf aio_req_t * 17622248Sraf _aio_req_get(aio_worker_t *aiowp) 17632248Sraf { 17642248Sraf aio_req_t *reqp; 17652248Sraf 17662248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17672248Sraf if ((reqp = aiowp->work_next1) != NULL) { 17682248Sraf /* 17692248Sraf * Remove a POSIX request from the queue; the 17702248Sraf * request queue is a singularly linked list 17712248Sraf * with a previous pointer. The request is 17722248Sraf * removed by updating the previous pointer. 17732248Sraf * 17742248Sraf * Non-posix requests are left on the queue 17752248Sraf * to eventually be placed on the done queue. 17762248Sraf */ 17772248Sraf 17782248Sraf if (POSIX_AIO(reqp)) { 17792248Sraf if (aiowp->work_prev1 == NULL) { 17802248Sraf aiowp->work_tail1 = reqp->req_next; 17812248Sraf if (aiowp->work_tail1 == NULL) 17822248Sraf aiowp->work_head1 = NULL; 17832248Sraf } else { 17842248Sraf aiowp->work_prev1->req_next = reqp->req_next; 17852248Sraf if (aiowp->work_head1 == reqp) 17862248Sraf aiowp->work_head1 = reqp->req_next; 17872248Sraf } 17882248Sraf 17892248Sraf } else { 17902248Sraf aiowp->work_prev1 = reqp; 17912248Sraf ASSERT(aiowp->work_done1 >= 0); 17922248Sraf aiowp->work_done1++; 17932248Sraf } 17942248Sraf ASSERT(reqp != reqp->req_next); 17952248Sraf aiowp->work_next1 = reqp->req_next; 17962248Sraf ASSERT(aiowp->work_count1 >= 1); 17972248Sraf aiowp->work_count1--; 17982248Sraf switch (reqp->req_op) { 17992248Sraf case AIOREAD: 18002248Sraf case AIOWRITE: 18012248Sraf case AIOAREAD: 18022248Sraf case AIOAWRITE: 18032248Sraf #if !defined(_LP64) 18042248Sraf case AIOAREAD64: 18052248Sraf case AIOAWRITE64: 18062248Sraf #endif 18072248Sraf ASSERT(aiowp->work_minload1 > 0); 18082248Sraf aiowp->work_minload1--; 18092248Sraf break; 18102248Sraf } 18112248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 18122248Sraf } 18132248Sraf aiowp->work_req = reqp; 18142248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 18152248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 18162248Sraf return (reqp); 18172248Sraf } 18182248Sraf 18192248Sraf static void 18202248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 18212248Sraf { 18222248Sraf aio_req_t **last; 18232248Sraf aio_req_t *lastrp; 18242248Sraf aio_req_t *next; 18252248Sraf 18262248Sraf ASSERT(aiowp != NULL); 18272248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 18282248Sraf if (POSIX_AIO(reqp)) { 18292248Sraf if (ostate != AIO_REQ_QUEUED) 18302248Sraf return; 18312248Sraf } 18322248Sraf last = &aiowp->work_tail1; 18332248Sraf lastrp = aiowp->work_tail1; 18342248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 18352248Sraf while ((next = *last) != NULL) { 18362248Sraf if (next == reqp) { 18372248Sraf *last = next->req_next; 18382248Sraf if (aiowp->work_next1 == next) 18392248Sraf aiowp->work_next1 = next->req_next; 18402248Sraf 18412248Sraf if ((next->req_next != NULL) || 18422248Sraf (aiowp->work_done1 == 0)) { 18432248Sraf if (aiowp->work_head1 == next) 18442248Sraf aiowp->work_head1 = next->req_next; 18452248Sraf if (aiowp->work_prev1 == next) 18462248Sraf aiowp->work_prev1 = next->req_next; 18472248Sraf } else { 18482248Sraf if (aiowp->work_head1 == next) 18492248Sraf aiowp->work_head1 = lastrp; 18502248Sraf if (aiowp->work_prev1 == next) 18512248Sraf aiowp->work_prev1 = lastrp; 18522248Sraf } 18532248Sraf 18542248Sraf if (ostate == AIO_REQ_QUEUED) { 18552248Sraf ASSERT(aiowp->work_count1 >= 1); 18562248Sraf aiowp->work_count1--; 18572248Sraf ASSERT(aiowp->work_minload1 >= 1); 18582248Sraf aiowp->work_minload1--; 18592248Sraf } else { 18602248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 18612248Sraf !POSIX_AIO(reqp)); 18622248Sraf aiowp->work_done1--; 18632248Sraf } 18642248Sraf return; 18652248Sraf } 18662248Sraf last = &next->req_next; 18672248Sraf lastrp = next; 18682248Sraf } 18692248Sraf /* NOTREACHED */ 18702248Sraf } 18712248Sraf 18722248Sraf static void 18732248Sraf _aio_enq_doneq(aio_req_t *reqp) 18742248Sraf { 18752248Sraf if (_aio_doneq == NULL) { 18762248Sraf _aio_doneq = reqp; 18772248Sraf reqp->req_next = reqp->req_prev = reqp; 18782248Sraf } else { 18792248Sraf reqp->req_next = _aio_doneq; 18802248Sraf reqp->req_prev = _aio_doneq->req_prev; 18812248Sraf _aio_doneq->req_prev->req_next = reqp; 18822248Sraf _aio_doneq->req_prev = reqp; 18832248Sraf } 18842248Sraf reqp->req_state = AIO_REQ_DONEQ; 18852248Sraf _aio_doneq_cnt++; 18862248Sraf } 18872248Sraf 18882248Sraf /* 18892248Sraf * caller owns the _aio_mutex 18902248Sraf */ 18912248Sraf aio_req_t * 18922248Sraf _aio_req_remove(aio_req_t *reqp) 18932248Sraf { 18942248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 18952248Sraf return (NULL); 18962248Sraf 18972248Sraf if (reqp) { 18982248Sraf /* request in done queue */ 18992248Sraf if (_aio_doneq == reqp) 19002248Sraf _aio_doneq = reqp->req_next; 19012248Sraf if (_aio_doneq == reqp) { 19022248Sraf /* only one request on queue */ 19032248Sraf _aio_doneq = NULL; 19042248Sraf } else { 19052248Sraf aio_req_t *tmp = reqp->req_next; 19062248Sraf reqp->req_prev->req_next = tmp; 19072248Sraf tmp->req_prev = reqp->req_prev; 19082248Sraf } 19092248Sraf } else if ((reqp = _aio_doneq) != NULL) { 19102248Sraf if (reqp == reqp->req_next) { 19112248Sraf /* only one request on queue */ 19122248Sraf _aio_doneq = NULL; 19132248Sraf } else { 19142248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 19152248Sraf _aio_doneq->req_prev = reqp->req_prev; 19162248Sraf } 19172248Sraf } 19182248Sraf if (reqp) { 19192248Sraf _aio_doneq_cnt--; 19202248Sraf reqp->req_next = reqp->req_prev = reqp; 19212248Sraf reqp->req_state = AIO_REQ_DONE; 19222248Sraf } 19232248Sraf return (reqp); 19242248Sraf } 19252248Sraf 19262248Sraf /* 19272248Sraf * An AIO request is identified by an aio_result_t pointer. The library 19282248Sraf * maps this aio_result_t pointer to its internal representation using a 19292248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 19302248Sraf */ 19312248Sraf static int 19322248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 19332248Sraf { 19342248Sraf aio_hash_t *hashp; 19352248Sraf aio_req_t **prev; 19362248Sraf aio_req_t *next; 19372248Sraf 19382248Sraf hashp = _aio_hash + AIOHASH(resultp); 19392248Sraf lmutex_lock(&hashp->hash_lock); 19402248Sraf prev = &hashp->hash_ptr; 19412248Sraf while ((next = *prev) != NULL) { 19422248Sraf if (resultp == next->req_resultp) { 19432248Sraf lmutex_unlock(&hashp->hash_lock); 19442248Sraf return (-1); 19452248Sraf } 19462248Sraf prev = &next->req_link; 19472248Sraf } 19482248Sraf *prev = reqp; 19492248Sraf ASSERT(reqp->req_link == NULL); 19502248Sraf lmutex_unlock(&hashp->hash_lock); 19512248Sraf return (0); 19522248Sraf } 19532248Sraf 19542248Sraf /* 19552248Sraf * Remove an entry from the hash table. 19562248Sraf */ 19572248Sraf aio_req_t * 19582248Sraf _aio_hash_del(aio_result_t *resultp) 19592248Sraf { 19602248Sraf aio_hash_t *hashp; 19612248Sraf aio_req_t **prev; 19622248Sraf aio_req_t *next = NULL; 19632248Sraf 19642248Sraf if (_aio_hash != NULL) { 19652248Sraf hashp = _aio_hash + AIOHASH(resultp); 19662248Sraf lmutex_lock(&hashp->hash_lock); 19672248Sraf prev = &hashp->hash_ptr; 19682248Sraf while ((next = *prev) != NULL) { 19692248Sraf if (resultp == next->req_resultp) { 19702248Sraf *prev = next->req_link; 19712248Sraf next->req_link = NULL; 19722248Sraf break; 19732248Sraf } 19742248Sraf prev = &next->req_link; 19752248Sraf } 19762248Sraf lmutex_unlock(&hashp->hash_lock); 19772248Sraf } 19782248Sraf return (next); 19792248Sraf } 19802248Sraf 19812248Sraf /* 19822248Sraf * find an entry in the hash table 19832248Sraf */ 19842248Sraf aio_req_t * 19852248Sraf _aio_hash_find(aio_result_t *resultp) 19862248Sraf { 19872248Sraf aio_hash_t *hashp; 19882248Sraf aio_req_t **prev; 19892248Sraf aio_req_t *next = NULL; 19902248Sraf 19912248Sraf if (_aio_hash != NULL) { 19922248Sraf hashp = _aio_hash + AIOHASH(resultp); 19932248Sraf lmutex_lock(&hashp->hash_lock); 19942248Sraf prev = &hashp->hash_ptr; 19952248Sraf while ((next = *prev) != NULL) { 19962248Sraf if (resultp == next->req_resultp) 19972248Sraf break; 19982248Sraf prev = &next->req_link; 19992248Sraf } 20002248Sraf lmutex_unlock(&hashp->hash_lock); 20012248Sraf } 20022248Sraf return (next); 20032248Sraf } 20042248Sraf 20052248Sraf /* 20062248Sraf * AIO interface for POSIX 20072248Sraf */ 20082248Sraf int 20092248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 20102248Sraf int mode, int flg) 20112248Sraf { 20122248Sraf aio_req_t *reqp; 20132248Sraf aio_args_t *ap; 20142248Sraf int kerr; 20152248Sraf 20162248Sraf if (aiocbp == NULL) { 20172248Sraf errno = EINVAL; 20182248Sraf return (-1); 20192248Sraf } 20202248Sraf 20212248Sraf /* initialize kaio */ 20222248Sraf if (!_kaio_ok) 20232248Sraf _kaio_init(); 20242248Sraf 20252248Sraf aiocbp->aio_state = NOCHECK; 20262248Sraf 20272248Sraf /* 20282248Sraf * If we have been called because a list I/O 20292248Sraf * kaio() failed, we dont want to repeat the 20302248Sraf * system call 20312248Sraf */ 20322248Sraf 20332248Sraf if (flg & AIO_KAIO) { 20342248Sraf /* 20352248Sraf * Try kernel aio first. 20362248Sraf * If errno is ENOTSUP/EBADFD, 20372248Sraf * fall back to the thread implementation. 20382248Sraf */ 20392248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 20402248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20412248Sraf aiocbp->aio_state = CHECK; 20422248Sraf kerr = (int)_kaio(mode, aiocbp); 20432248Sraf if (kerr == 0) 20442248Sraf return (0); 20452248Sraf if (errno != ENOTSUP && errno != EBADFD) { 20462248Sraf aiocbp->aio_resultp.aio_errno = errno; 20472248Sraf aiocbp->aio_resultp.aio_return = -1; 20482248Sraf aiocbp->aio_state = NOCHECK; 20492248Sraf return (-1); 20502248Sraf } 20512248Sraf if (errno == EBADFD) 20522248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 20532248Sraf } 20542248Sraf } 20552248Sraf 20562248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20572248Sraf aiocbp->aio_state = USERAIO; 20582248Sraf 20592248Sraf if (!__uaio_ok && __uaio_init() == -1) 20602248Sraf return (-1); 20612248Sraf 20622248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 20632248Sraf errno = EAGAIN; 20642248Sraf return (-1); 20652248Sraf } 20662248Sraf 20672248Sraf /* 20682248Sraf * If an LIO request, add the list head to the aio request 20692248Sraf */ 20702248Sraf reqp->req_head = lio_head; 20712248Sraf reqp->req_type = AIO_POSIX_REQ; 20722248Sraf reqp->req_op = mode; 20732248Sraf reqp->req_largefile = 0; 20742248Sraf 20752248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 20762248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 20772248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 20782248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 20792248Sraf reqp->req_sigevent.sigev_signo = 20802248Sraf aiocbp->aio_sigevent.sigev_signo; 20812248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20822248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 20832248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 20842248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 20852248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 20862248Sraf /* 20872248Sraf * Reuse the sigevent structure to contain the port number 20882248Sraf * and the user value. Same for SIGEV_THREAD, below. 20892248Sraf */ 20902248Sraf reqp->req_sigevent.sigev_signo = 20912248Sraf pn->portnfy_port; 20922248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20932248Sraf pn->portnfy_user; 20942248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 20952248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 20962248Sraf /* 20972248Sraf * The sigevent structure contains the port number 20982248Sraf * and the user value. Same for SIGEV_PORT, above. 20992248Sraf */ 21002248Sraf reqp->req_sigevent.sigev_signo = 21012248Sraf aiocbp->aio_sigevent.sigev_signo; 21022248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21032248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21042248Sraf } 21052248Sraf 21062248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 21072248Sraf reqp->req_aiocbp = aiocbp; 21082248Sraf ap = &reqp->req_args; 21092248Sraf ap->fd = aiocbp->aio_fildes; 21102248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 21112248Sraf ap->bufsz = aiocbp->aio_nbytes; 21122248Sraf ap->offset = aiocbp->aio_offset; 21132248Sraf 21142248Sraf if ((flg & AIO_NO_DUPS) && 21152248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 21162248Sraf aio_panic("_aio_rw(): request already in hash table"); 21172248Sraf _aio_req_free(reqp); 21182248Sraf errno = EINVAL; 21192248Sraf return (-1); 21202248Sraf } 21212248Sraf _aio_req_add(reqp, nextworker, mode); 21222248Sraf return (0); 21232248Sraf } 21242248Sraf 21252248Sraf #if !defined(_LP64) 21262248Sraf /* 21272248Sraf * 64-bit AIO interface for POSIX 21282248Sraf */ 21292248Sraf int 21302248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 21312248Sraf int mode, int flg) 21322248Sraf { 21332248Sraf aio_req_t *reqp; 21342248Sraf aio_args_t *ap; 21352248Sraf int kerr; 21362248Sraf 21372248Sraf if (aiocbp == NULL) { 21382248Sraf errno = EINVAL; 21392248Sraf return (-1); 21402248Sraf } 21412248Sraf 21422248Sraf /* initialize kaio */ 21432248Sraf if (!_kaio_ok) 21442248Sraf _kaio_init(); 21452248Sraf 21462248Sraf aiocbp->aio_state = NOCHECK; 21472248Sraf 21482248Sraf /* 21492248Sraf * If we have been called because a list I/O 21502248Sraf * kaio() failed, we dont want to repeat the 21512248Sraf * system call 21522248Sraf */ 21532248Sraf 21542248Sraf if (flg & AIO_KAIO) { 21552248Sraf /* 21562248Sraf * Try kernel aio first. 21572248Sraf * If errno is ENOTSUP/EBADFD, 21582248Sraf * fall back to the thread implementation. 21592248Sraf */ 21602248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 21612248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21622248Sraf aiocbp->aio_state = CHECK; 21632248Sraf kerr = (int)_kaio(mode, aiocbp); 21642248Sraf if (kerr == 0) 21652248Sraf return (0); 21662248Sraf if (errno != ENOTSUP && errno != EBADFD) { 21672248Sraf aiocbp->aio_resultp.aio_errno = errno; 21682248Sraf aiocbp->aio_resultp.aio_return = -1; 21692248Sraf aiocbp->aio_state = NOCHECK; 21702248Sraf return (-1); 21712248Sraf } 21722248Sraf if (errno == EBADFD) 21732248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 21742248Sraf } 21752248Sraf } 21762248Sraf 21772248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21782248Sraf aiocbp->aio_state = USERAIO; 21792248Sraf 21802248Sraf if (!__uaio_ok && __uaio_init() == -1) 21812248Sraf return (-1); 21822248Sraf 21832248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 21842248Sraf errno = EAGAIN; 21852248Sraf return (-1); 21862248Sraf } 21872248Sraf 21882248Sraf /* 21892248Sraf * If an LIO request, add the list head to the aio request 21902248Sraf */ 21912248Sraf reqp->req_head = lio_head; 21922248Sraf reqp->req_type = AIO_POSIX_REQ; 21932248Sraf reqp->req_op = mode; 21942248Sraf reqp->req_largefile = 1; 21952248Sraf 21962248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 21972248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 21982248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 21992248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 22002248Sraf reqp->req_sigevent.sigev_signo = 22012248Sraf aiocbp->aio_sigevent.sigev_signo; 22022248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22032248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22042248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 22052248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 22062248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 22072248Sraf reqp->req_sigevent.sigev_signo = 22082248Sraf pn->portnfy_port; 22092248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22102248Sraf pn->portnfy_user; 22112248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 22122248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 22132248Sraf reqp->req_sigevent.sigev_signo = 22142248Sraf aiocbp->aio_sigevent.sigev_signo; 22152248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 22162248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 22172248Sraf } 22182248Sraf 22192248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 22202248Sraf reqp->req_aiocbp = aiocbp; 22212248Sraf ap = &reqp->req_args; 22222248Sraf ap->fd = aiocbp->aio_fildes; 22232248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 22242248Sraf ap->bufsz = aiocbp->aio_nbytes; 22252248Sraf ap->offset = aiocbp->aio_offset; 22262248Sraf 22272248Sraf if ((flg & AIO_NO_DUPS) && 22282248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 22292248Sraf aio_panic("_aio_rw64(): request already in hash table"); 22302248Sraf _aio_req_free(reqp); 22312248Sraf errno = EINVAL; 22322248Sraf return (-1); 22332248Sraf } 22342248Sraf _aio_req_add(reqp, nextworker, mode); 22352248Sraf return (0); 22362248Sraf } 22372248Sraf #endif /* !defined(_LP64) */ 2238