12248Sraf /* 22248Sraf * CDDL HEADER START 32248Sraf * 42248Sraf * The contents of this file are subject to the terms of the 52248Sraf * Common Development and Distribution License (the "License"). 62248Sraf * You may not use this file except in compliance with the License. 72248Sraf * 82248Sraf * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 92248Sraf * or http://www.opensolaris.org/os/licensing. 102248Sraf * See the License for the specific language governing permissions 112248Sraf * and limitations under the License. 122248Sraf * 132248Sraf * When distributing Covered Code, include this CDDL HEADER in each 142248Sraf * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 152248Sraf * If applicable, add the following below this CDDL HEADER, with the 162248Sraf * fields enclosed by brackets "[]" replaced with your own identifying 172248Sraf * information: Portions Copyright [yyyy] [name of copyright owner] 182248Sraf * 192248Sraf * CDDL HEADER END 202248Sraf */ 212248Sraf 222248Sraf /* 232248Sraf * Copyright 2006 Sun Microsystems, Inc. All rights reserved. 242248Sraf * Use is subject to license terms. 252248Sraf */ 262248Sraf 272248Sraf #pragma ident "%Z%%M% %I% %E% SMI" 282248Sraf 292248Sraf #include "synonyms.h" 302248Sraf #include "thr_uberdata.h" 312248Sraf #include "asyncio.h" 322248Sraf #include <atomic.h> 332248Sraf #include <sys/param.h> 342248Sraf #include <sys/file.h> 352248Sraf #include <sys/port.h> 362248Sraf 372248Sraf static int _aio_hash_insert(aio_result_t *, aio_req_t *); 382248Sraf static aio_req_t *_aio_req_get(aio_worker_t *); 392248Sraf static void _aio_req_add(aio_req_t *, aio_worker_t **, int); 402248Sraf static void _aio_req_del(aio_worker_t *, aio_req_t *, int); 412248Sraf static void _aio_work_done(aio_worker_t *); 422248Sraf static void _aio_enq_doneq(aio_req_t *); 432248Sraf 442248Sraf extern void _aio_lio_free(aio_lio_t *); 452248Sraf 462248Sraf extern int __fdsync(int, int); 472248Sraf extern int _port_dispatch(int, int, int, int, uintptr_t, void *); 482248Sraf 492248Sraf static int _aio_fsync_del(aio_worker_t *, aio_req_t *); 502248Sraf static void _aiodone(aio_req_t *, ssize_t, int); 512248Sraf static void _aio_cancel_work(aio_worker_t *, int, int *, int *); 522248Sraf static void _aio_finish_request(aio_worker_t *, ssize_t, int); 532248Sraf 542248Sraf /* 552248Sraf * switch for kernel async I/O 562248Sraf */ 572248Sraf int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ 582248Sraf 592248Sraf /* 602248Sraf * Key for thread-specific data 612248Sraf */ 622248Sraf pthread_key_t _aio_key; 632248Sraf 642248Sraf /* 652248Sraf * Array for determining whether or not a file supports kaio. 662248Sraf * Initialized in _kaio_init(). 672248Sraf */ 682248Sraf uint32_t *_kaio_supported = NULL; 692248Sraf 702248Sraf /* 712248Sraf * workers for read/write requests 722248Sraf * (__aio_mutex lock protects circular linked list of workers) 732248Sraf */ 742248Sraf aio_worker_t *__workers_rw; /* circular list of AIO workers */ 752248Sraf aio_worker_t *__nextworker_rw; /* next worker in list of workers */ 762248Sraf int __rw_workerscnt; /* number of read/write workers */ 772248Sraf 782248Sraf /* 792248Sraf * worker for notification requests. 802248Sraf */ 812248Sraf aio_worker_t *__workers_no; /* circular list of AIO workers */ 822248Sraf aio_worker_t *__nextworker_no; /* next worker in list of workers */ 832248Sraf int __no_workerscnt; /* number of write workers */ 842248Sraf 852248Sraf aio_req_t *_aio_done_tail; /* list of done requests */ 862248Sraf aio_req_t *_aio_done_head; 872248Sraf 882248Sraf mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ 892248Sraf cond_t __aio_initcv = DEFAULTCV; 902248Sraf int __aio_initbusy = 0; 912248Sraf 922248Sraf mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ 932248Sraf cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ 942248Sraf 952248Sraf pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ 962248Sraf int _sigio_enabled = 0; /* when set, send SIGIO signal */ 972248Sraf 982248Sraf aio_hash_t *_aio_hash; 992248Sraf 1002248Sraf aio_req_t *_aio_doneq; /* double linked done queue list */ 1012248Sraf 1022248Sraf int _aio_donecnt = 0; 1032248Sraf int _aio_waitncnt = 0; /* # of requests for aio_waitn */ 1042248Sraf int _aio_doneq_cnt = 0; 1052248Sraf int _aio_outstand_cnt = 0; /* # of outstanding requests */ 1062248Sraf int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ 1072248Sraf int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ 1082248Sraf int _aio_kernel_suspend = 0; /* active kernel kaio calls */ 1092248Sraf int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ 1102248Sraf 1112248Sraf int _max_workers = 256; /* max number of workers permitted */ 1122248Sraf int _min_workers = 4; /* min number of workers */ 1132248Sraf int _minworkload = 2; /* min number of request in q */ 1142248Sraf int _aio_worker_cnt = 0; /* number of workers to do requests */ 1152248Sraf int __uaio_ok = 0; /* AIO has been enabled */ 1162248Sraf sigset_t _worker_set; /* worker's signal mask */ 1172248Sraf 1182248Sraf int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ 1192248Sraf int _aio_flags = 0; /* see asyncio.h defines for */ 1202248Sraf 1212248Sraf aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ 1222248Sraf 1232248Sraf int hz; /* clock ticks per second */ 1242248Sraf 1252248Sraf static int 1262248Sraf _kaio_supported_init(void) 1272248Sraf { 1282248Sraf void *ptr; 1292248Sraf size_t size; 1302248Sraf 1312248Sraf if (_kaio_supported != NULL) /* already initialized */ 1322248Sraf return (0); 1332248Sraf 1342248Sraf size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); 1352248Sraf ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, 1362248Sraf MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 1372248Sraf if (ptr == MAP_FAILED) 1382248Sraf return (-1); 1392248Sraf _kaio_supported = ptr; 1402248Sraf return (0); 1412248Sraf } 1422248Sraf 1432248Sraf /* 1442248Sraf * The aio subsystem is initialized when an AIO request is made. 1452248Sraf * Constants are initialized like the max number of workers that 1462248Sraf * the subsystem can create, and the minimum number of workers 1472248Sraf * permitted before imposing some restrictions. Also, some 1482248Sraf * workers are created. 1492248Sraf */ 1502248Sraf int 1512248Sraf __uaio_init(void) 1522248Sraf { 1532248Sraf int ret = -1; 1542248Sraf int i; 1552248Sraf 1562248Sraf lmutex_lock(&__aio_initlock); 1572248Sraf while (__aio_initbusy) 1582248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 1592248Sraf if (__uaio_ok) { /* already initialized */ 1602248Sraf lmutex_unlock(&__aio_initlock); 1612248Sraf return (0); 1622248Sraf } 1632248Sraf __aio_initbusy = 1; 1642248Sraf lmutex_unlock(&__aio_initlock); 1652248Sraf 1662248Sraf hz = (int)sysconf(_SC_CLK_TCK); 1672248Sraf __pid = getpid(); 1682248Sraf 1692248Sraf setup_cancelsig(SIGAIOCANCEL); 1702248Sraf 1712248Sraf if (_kaio_supported_init() != 0) 1722248Sraf goto out; 1732248Sraf 1742248Sraf /* 1752248Sraf * Allocate and initialize the hash table. 176*3344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 1772248Sraf */ 178*3344Ssp92102 if (_aio_hash == NULL) { 179*3344Ssp92102 /* LINTED pointer cast */ 180*3344Ssp92102 _aio_hash = (aio_hash_t *)mmap(NULL, 181*3344Ssp92102 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, 182*3344Ssp92102 MAP_PRIVATE | MAP_ANON, -1, (off_t)0); 183*3344Ssp92102 if ((void *)_aio_hash == MAP_FAILED) { 184*3344Ssp92102 _aio_hash = NULL; 185*3344Ssp92102 goto out; 186*3344Ssp92102 } 187*3344Ssp92102 for (i = 0; i < HASHSZ; i++) 188*3344Ssp92102 (void) mutex_init(&_aio_hash[i].hash_lock, 189*3344Ssp92102 USYNC_THREAD, NULL); 1902248Sraf } 1912248Sraf 1922248Sraf /* 1932248Sraf * Initialize worker's signal mask to only catch SIGAIOCANCEL. 1942248Sraf */ 1952248Sraf (void) sigfillset(&_worker_set); 1962248Sraf (void) sigdelset(&_worker_set, SIGAIOCANCEL); 1972248Sraf 1982248Sraf /* 199*3344Ssp92102 * Create one worker to send asynchronous notifications. 200*3344Ssp92102 * Do this only once, even if __uaio_init() is called twice. 201*3344Ssp92102 */ 202*3344Ssp92102 if (__no_workerscnt == 0 && 203*3344Ssp92102 (_aio_create_worker(NULL, AIONOTIFY) != 0)) { 204*3344Ssp92102 errno = EAGAIN; 205*3344Ssp92102 goto out; 206*3344Ssp92102 } 207*3344Ssp92102 208*3344Ssp92102 /* 2092248Sraf * Create the minimum number of read/write workers. 210*3344Ssp92102 * And later check whether atleast one worker is created; 211*3344Ssp92102 * lwp_create() calls could fail because of segkp exhaustion. 2122248Sraf */ 2132248Sraf for (i = 0; i < _min_workers; i++) 2142248Sraf (void) _aio_create_worker(NULL, AIOREAD); 215*3344Ssp92102 if (__rw_workerscnt == 0) { 216*3344Ssp92102 errno = EAGAIN; 217*3344Ssp92102 goto out; 218*3344Ssp92102 } 2192248Sraf 2202248Sraf ret = 0; 2212248Sraf out: 2222248Sraf lmutex_lock(&__aio_initlock); 2232248Sraf if (ret == 0) 2242248Sraf __uaio_ok = 1; 2252248Sraf __aio_initbusy = 0; 2262248Sraf (void) cond_broadcast(&__aio_initcv); 2272248Sraf lmutex_unlock(&__aio_initlock); 2282248Sraf return (ret); 2292248Sraf } 2302248Sraf 2312248Sraf /* 2322248Sraf * Called from close() before actually performing the real _close(). 2332248Sraf */ 2342248Sraf void 2352248Sraf _aio_close(int fd) 2362248Sraf { 2372248Sraf if (fd < 0) /* avoid cancelling everything */ 2382248Sraf return; 2392248Sraf /* 2402248Sraf * Cancel all outstanding aio requests for this file descriptor. 2412248Sraf */ 2422248Sraf if (__uaio_ok) 2432248Sraf (void) aiocancel_all(fd); 2442248Sraf /* 2452248Sraf * If we have allocated the bit array, clear the bit for this file. 2462248Sraf * The next open may re-use this file descriptor and the new file 2472248Sraf * may have different kaio() behaviour. 2482248Sraf */ 2492248Sraf if (_kaio_supported != NULL) 2502248Sraf CLEAR_KAIO_SUPPORTED(fd); 2512248Sraf } 2522248Sraf 2532248Sraf /* 2542248Sraf * special kaio cleanup thread sits in a loop in the 2552248Sraf * kernel waiting for pending kaio requests to complete. 2562248Sraf */ 2572248Sraf void * 2582248Sraf _kaio_cleanup_thread(void *arg) 2592248Sraf { 2602248Sraf if (pthread_setspecific(_aio_key, arg) != 0) 2612248Sraf aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); 2622248Sraf (void) _kaio(AIOSTART); 2632248Sraf return (arg); 2642248Sraf } 2652248Sraf 2662248Sraf /* 2672248Sraf * initialize kaio. 2682248Sraf */ 2692248Sraf void 2702248Sraf _kaio_init() 2712248Sraf { 2722248Sraf int error; 2732248Sraf sigset_t oset; 2742248Sraf 2752248Sraf lmutex_lock(&__aio_initlock); 2762248Sraf while (__aio_initbusy) 2772248Sraf (void) _cond_wait(&__aio_initcv, &__aio_initlock); 2782248Sraf if (_kaio_ok) { /* already initialized */ 2792248Sraf lmutex_unlock(&__aio_initlock); 2802248Sraf return; 2812248Sraf } 2822248Sraf __aio_initbusy = 1; 2832248Sraf lmutex_unlock(&__aio_initlock); 2842248Sraf 2852248Sraf if (_kaio_supported_init() != 0) 2862248Sraf error = ENOMEM; 2872248Sraf else if ((_kaiowp = _aio_worker_alloc()) == NULL) 2882248Sraf error = ENOMEM; 2892248Sraf else if ((error = (int)_kaio(AIOINIT)) == 0) { 2902248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 2912248Sraf error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, 2922248Sraf _kaiowp, THR_DAEMON, &_kaiowp->work_tid); 2932248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 2942248Sraf } 2952248Sraf if (error && _kaiowp != NULL) { 2962248Sraf _aio_worker_free(_kaiowp); 2972248Sraf _kaiowp = NULL; 2982248Sraf } 2992248Sraf 3002248Sraf lmutex_lock(&__aio_initlock); 3012248Sraf if (error) 3022248Sraf _kaio_ok = -1; 3032248Sraf else 3042248Sraf _kaio_ok = 1; 3052248Sraf __aio_initbusy = 0; 3062248Sraf (void) cond_broadcast(&__aio_initcv); 3072248Sraf lmutex_unlock(&__aio_initlock); 3082248Sraf } 3092248Sraf 3102248Sraf int 3112248Sraf aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3122248Sraf aio_result_t *resultp) 3132248Sraf { 3142248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); 3152248Sraf } 3162248Sraf 3172248Sraf int 3182248Sraf aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, 3192248Sraf aio_result_t *resultp) 3202248Sraf { 3212248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); 3222248Sraf } 3232248Sraf 3242248Sraf #if !defined(_LP64) 3252248Sraf int 3262248Sraf aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3272248Sraf aio_result_t *resultp) 3282248Sraf { 3292248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); 3302248Sraf } 3312248Sraf 3322248Sraf int 3332248Sraf aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, 3342248Sraf aio_result_t *resultp) 3352248Sraf { 3362248Sraf return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); 3372248Sraf } 3382248Sraf #endif /* !defined(_LP64) */ 3392248Sraf 3402248Sraf int 3412248Sraf _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, 3422248Sraf aio_result_t *resultp, int mode) 3432248Sraf { 3442248Sraf aio_req_t *reqp; 3452248Sraf aio_args_t *ap; 3462248Sraf offset_t loffset; 3472248Sraf struct stat stat; 3482248Sraf int error = 0; 3492248Sraf int kerr; 3502248Sraf int umode; 3512248Sraf 3522248Sraf switch (whence) { 3532248Sraf 3542248Sraf case SEEK_SET: 3552248Sraf loffset = offset; 3562248Sraf break; 3572248Sraf case SEEK_CUR: 3582248Sraf if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) 3592248Sraf error = -1; 3602248Sraf else 3612248Sraf loffset += offset; 3622248Sraf break; 3632248Sraf case SEEK_END: 3642248Sraf if (fstat(fd, &stat) == -1) 3652248Sraf error = -1; 3662248Sraf else 3672248Sraf loffset = offset + stat.st_size; 3682248Sraf break; 3692248Sraf default: 3702248Sraf errno = EINVAL; 3712248Sraf error = -1; 3722248Sraf } 3732248Sraf 3742248Sraf if (error) 3752248Sraf return (error); 3762248Sraf 3772248Sraf /* initialize kaio */ 3782248Sraf if (!_kaio_ok) 3792248Sraf _kaio_init(); 3802248Sraf 3812248Sraf /* 3822248Sraf * _aio_do_request() needs the original request code (mode) to be able 3832248Sraf * to choose the appropiate 32/64 bit function. All other functions 3842248Sraf * only require the difference between READ and WRITE (umode). 3852248Sraf */ 3862248Sraf if (mode == AIOAREAD64 || mode == AIOAWRITE64) 3872248Sraf umode = mode - AIOAREAD64; 3882248Sraf else 3892248Sraf umode = mode; 3902248Sraf 3912248Sraf /* 3922248Sraf * Try kernel aio first. 3932248Sraf * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. 3942248Sraf */ 3952248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { 3962248Sraf resultp->aio_errno = 0; 3972248Sraf sig_mutex_lock(&__aio_mutex); 3982248Sraf _kaio_outstand_cnt++; 3992248Sraf kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? 4002248Sraf (umode | AIO_POLL_BIT) : umode), 4012248Sraf fd, buf, bufsz, loffset, resultp); 4022248Sraf if (kerr == 0) { 4032248Sraf sig_mutex_unlock(&__aio_mutex); 4042248Sraf return (0); 4052248Sraf } 4062248Sraf _kaio_outstand_cnt--; 4072248Sraf sig_mutex_unlock(&__aio_mutex); 4082248Sraf if (errno != ENOTSUP && errno != EBADFD) 4092248Sraf return (-1); 4102248Sraf if (errno == EBADFD) 4112248Sraf SET_KAIO_NOT_SUPPORTED(fd); 4122248Sraf } 4132248Sraf 4142248Sraf if (!__uaio_ok && __uaio_init() == -1) 4152248Sraf return (-1); 4162248Sraf 4172248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 4182248Sraf errno = EAGAIN; 4192248Sraf return (-1); 4202248Sraf } 4212248Sraf 4222248Sraf /* 4232248Sraf * _aio_do_request() checks reqp->req_op to differentiate 4242248Sraf * between 32 and 64 bit access. 4252248Sraf */ 4262248Sraf reqp->req_op = mode; 4272248Sraf reqp->req_resultp = resultp; 4282248Sraf ap = &reqp->req_args; 4292248Sraf ap->fd = fd; 4302248Sraf ap->buf = buf; 4312248Sraf ap->bufsz = bufsz; 4322248Sraf ap->offset = loffset; 4332248Sraf 4342248Sraf if (_aio_hash_insert(resultp, reqp) != 0) { 4352248Sraf _aio_req_free(reqp); 4362248Sraf errno = EINVAL; 4372248Sraf return (-1); 4382248Sraf } 4392248Sraf /* 4402248Sraf * _aio_req_add() only needs the difference between READ and 4412248Sraf * WRITE to choose the right worker queue. 4422248Sraf */ 4432248Sraf _aio_req_add(reqp, &__nextworker_rw, umode); 4442248Sraf return (0); 4452248Sraf } 4462248Sraf 4472248Sraf int 4482248Sraf aiocancel(aio_result_t *resultp) 4492248Sraf { 4502248Sraf aio_req_t *reqp; 4512248Sraf aio_worker_t *aiowp; 4522248Sraf int ret; 4532248Sraf int done = 0; 4542248Sraf int canceled = 0; 4552248Sraf 4562248Sraf if (!__uaio_ok) { 4572248Sraf errno = EINVAL; 4582248Sraf return (-1); 4592248Sraf } 4602248Sraf 4612248Sraf sig_mutex_lock(&__aio_mutex); 4622248Sraf reqp = _aio_hash_find(resultp); 4632248Sraf if (reqp == NULL) { 4642248Sraf if (_aio_outstand_cnt == _aio_req_done_cnt) 4652248Sraf errno = EINVAL; 4662248Sraf else 4672248Sraf errno = EACCES; 4682248Sraf ret = -1; 4692248Sraf } else { 4702248Sraf aiowp = reqp->req_worker; 4712248Sraf sig_mutex_lock(&aiowp->work_qlock1); 4722248Sraf (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); 4732248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 4742248Sraf 4752248Sraf if (canceled) { 4762248Sraf ret = 0; 4772248Sraf } else { 4782248Sraf if (_aio_outstand_cnt == 0 || 4792248Sraf _aio_outstand_cnt == _aio_req_done_cnt) 4802248Sraf errno = EINVAL; 4812248Sraf else 4822248Sraf errno = EACCES; 4832248Sraf ret = -1; 4842248Sraf } 4852248Sraf } 4862248Sraf sig_mutex_unlock(&__aio_mutex); 4872248Sraf return (ret); 4882248Sraf } 4892248Sraf 4902248Sraf /* 4912248Sraf * This must be asynch safe 4922248Sraf */ 4932248Sraf aio_result_t * 4942248Sraf aiowait(struct timeval *uwait) 4952248Sraf { 4962248Sraf aio_result_t *uresultp; 4972248Sraf aio_result_t *kresultp; 4982248Sraf aio_result_t *resultp; 4992248Sraf int dontblock; 5002248Sraf int timedwait = 0; 5012248Sraf int kaio_errno = 0; 5022248Sraf struct timeval twait; 5032248Sraf struct timeval *wait = NULL; 5042248Sraf hrtime_t hrtend; 5052248Sraf hrtime_t hres; 5062248Sraf 5072248Sraf if (uwait) { 5082248Sraf /* 5092248Sraf * Check for a valid specified wait time. 5102248Sraf * If it is invalid, fail the call right away. 5112248Sraf */ 5122248Sraf if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || 5132248Sraf uwait->tv_usec >= MICROSEC) { 5142248Sraf errno = EINVAL; 5152248Sraf return ((aio_result_t *)-1); 5162248Sraf } 5172248Sraf 5182248Sraf if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { 5192248Sraf hrtend = gethrtime() + 5202248Sraf (hrtime_t)uwait->tv_sec * NANOSEC + 5212248Sraf (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); 5222248Sraf twait = *uwait; 5232248Sraf wait = &twait; 5242248Sraf timedwait++; 5252248Sraf } else { 5262248Sraf /* polling */ 5272248Sraf sig_mutex_lock(&__aio_mutex); 5282248Sraf if (_kaio_outstand_cnt == 0) { 5292248Sraf kresultp = (aio_result_t *)-1; 5302248Sraf } else { 5312248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5322248Sraf (struct timeval *)-1, 1); 5332248Sraf if (kresultp != (aio_result_t *)-1 && 5342248Sraf kresultp != NULL && 5352248Sraf kresultp != (aio_result_t *)1) { 5362248Sraf _kaio_outstand_cnt--; 5372248Sraf sig_mutex_unlock(&__aio_mutex); 5382248Sraf return (kresultp); 5392248Sraf } 5402248Sraf } 5412248Sraf uresultp = _aio_req_done(); 5422248Sraf sig_mutex_unlock(&__aio_mutex); 5432248Sraf if (uresultp != NULL && 5442248Sraf uresultp != (aio_result_t *)-1) { 5452248Sraf return (uresultp); 5462248Sraf } 5472248Sraf if (uresultp == (aio_result_t *)-1 && 5482248Sraf kresultp == (aio_result_t *)-1) { 5492248Sraf errno = EINVAL; 5502248Sraf return ((aio_result_t *)-1); 5512248Sraf } else { 5522248Sraf return (NULL); 5532248Sraf } 5542248Sraf } 5552248Sraf } 5562248Sraf 5572248Sraf for (;;) { 5582248Sraf sig_mutex_lock(&__aio_mutex); 5592248Sraf uresultp = _aio_req_done(); 5602248Sraf if (uresultp != NULL && uresultp != (aio_result_t *)-1) { 5612248Sraf sig_mutex_unlock(&__aio_mutex); 5622248Sraf resultp = uresultp; 5632248Sraf break; 5642248Sraf } 5652248Sraf _aiowait_flag++; 5662248Sraf dontblock = (uresultp == (aio_result_t *)-1); 5672248Sraf if (dontblock && _kaio_outstand_cnt == 0) { 5682248Sraf kresultp = (aio_result_t *)-1; 5692248Sraf kaio_errno = EINVAL; 5702248Sraf } else { 5712248Sraf sig_mutex_unlock(&__aio_mutex); 5722248Sraf kresultp = (aio_result_t *)_kaio(AIOWAIT, 5732248Sraf wait, dontblock); 5742248Sraf sig_mutex_lock(&__aio_mutex); 5752248Sraf kaio_errno = errno; 5762248Sraf } 5772248Sraf _aiowait_flag--; 5782248Sraf sig_mutex_unlock(&__aio_mutex); 5792248Sraf if (kresultp == (aio_result_t *)1) { 5802248Sraf /* aiowait() awakened by an aionotify() */ 5812248Sraf continue; 5822248Sraf } else if (kresultp != NULL && 5832248Sraf kresultp != (aio_result_t *)-1) { 5842248Sraf resultp = kresultp; 5852248Sraf sig_mutex_lock(&__aio_mutex); 5862248Sraf _kaio_outstand_cnt--; 5872248Sraf sig_mutex_unlock(&__aio_mutex); 5882248Sraf break; 5892248Sraf } else if (kresultp == (aio_result_t *)-1 && 5902248Sraf kaio_errno == EINVAL && 5912248Sraf uresultp == (aio_result_t *)-1) { 5922248Sraf errno = kaio_errno; 5932248Sraf resultp = (aio_result_t *)-1; 5942248Sraf break; 5952248Sraf } else if (kresultp == (aio_result_t *)-1 && 5962248Sraf kaio_errno == EINTR) { 5972248Sraf errno = kaio_errno; 5982248Sraf resultp = (aio_result_t *)-1; 5992248Sraf break; 6002248Sraf } else if (timedwait) { 6012248Sraf hres = hrtend - gethrtime(); 6022248Sraf if (hres <= 0) { 6032248Sraf /* time is up; return */ 6042248Sraf resultp = NULL; 6052248Sraf break; 6062248Sraf } else { 6072248Sraf /* 6082248Sraf * Some time left. Round up the remaining time 6092248Sraf * in nanoseconds to microsec. Retry the call. 6102248Sraf */ 6112248Sraf hres += (NANOSEC / MICROSEC) - 1; 6122248Sraf wait->tv_sec = hres / NANOSEC; 6132248Sraf wait->tv_usec = 6142248Sraf (hres % NANOSEC) / (NANOSEC / MICROSEC); 6152248Sraf } 6162248Sraf } else { 6172248Sraf ASSERT(kresultp == NULL && uresultp == NULL); 6182248Sraf resultp = NULL; 6192248Sraf continue; 6202248Sraf } 6212248Sraf } 6222248Sraf return (resultp); 6232248Sraf } 6242248Sraf 6252248Sraf /* 6262248Sraf * _aio_get_timedelta calculates the remaining time and stores the result 6272248Sraf * into timespec_t *wait. 6282248Sraf */ 6292248Sraf 6302248Sraf int 6312248Sraf _aio_get_timedelta(timespec_t *end, timespec_t *wait) 6322248Sraf { 6332248Sraf int ret = 0; 6342248Sraf struct timeval cur; 6352248Sraf timespec_t curtime; 6362248Sraf 6372248Sraf (void) gettimeofday(&cur, NULL); 6382248Sraf curtime.tv_sec = cur.tv_sec; 6392248Sraf curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ 6402248Sraf 6412248Sraf if (end->tv_sec >= curtime.tv_sec) { 6422248Sraf wait->tv_sec = end->tv_sec - curtime.tv_sec; 6432248Sraf if (end->tv_nsec >= curtime.tv_nsec) { 6442248Sraf wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; 6452248Sraf if (wait->tv_sec == 0 && wait->tv_nsec == 0) 6462248Sraf ret = -1; /* timer expired */ 6472248Sraf } else { 6482248Sraf if (end->tv_sec > curtime.tv_sec) { 6492248Sraf wait->tv_sec -= 1; 6502248Sraf wait->tv_nsec = NANOSEC - 6512248Sraf (curtime.tv_nsec - end->tv_nsec); 6522248Sraf } else { 6532248Sraf ret = -1; /* timer expired */ 6542248Sraf } 6552248Sraf } 6562248Sraf } else { 6572248Sraf ret = -1; 6582248Sraf } 6592248Sraf return (ret); 6602248Sraf } 6612248Sraf 6622248Sraf /* 6632248Sraf * If closing by file descriptor: we will simply cancel all the outstanding 6642248Sraf * aio`s and return. Those aio's in question will have either noticed the 6652248Sraf * cancellation notice before, during, or after initiating io. 6662248Sraf */ 6672248Sraf int 6682248Sraf aiocancel_all(int fd) 6692248Sraf { 6702248Sraf aio_req_t *reqp; 6712248Sraf aio_req_t **reqpp; 6722248Sraf aio_worker_t *first; 6732248Sraf aio_worker_t *next; 6742248Sraf int canceled = 0; 6752248Sraf int done = 0; 6762248Sraf int cancelall = 0; 6772248Sraf 6782248Sraf sig_mutex_lock(&__aio_mutex); 6792248Sraf 6802248Sraf if (_aio_outstand_cnt == 0) { 6812248Sraf sig_mutex_unlock(&__aio_mutex); 6822248Sraf return (AIO_ALLDONE); 6832248Sraf } 6842248Sraf 6852248Sraf /* 6862248Sraf * Cancel requests from the read/write workers' queues. 6872248Sraf */ 6882248Sraf first = __nextworker_rw; 6892248Sraf next = first; 6902248Sraf do { 6912248Sraf _aio_cancel_work(next, fd, &canceled, &done); 6922248Sraf } while ((next = next->work_forw) != first); 6932248Sraf 6942248Sraf /* 6952248Sraf * finally, check if there are requests on the done queue that 6962248Sraf * should be canceled. 6972248Sraf */ 6982248Sraf if (fd < 0) 6992248Sraf cancelall = 1; 7002248Sraf reqpp = &_aio_done_tail; 7012248Sraf while ((reqp = *reqpp) != NULL) { 7022248Sraf if (cancelall || reqp->req_args.fd == fd) { 7032248Sraf *reqpp = reqp->req_next; 7042248Sraf _aio_donecnt--; 7052248Sraf (void) _aio_hash_del(reqp->req_resultp); 7062248Sraf _aio_req_free(reqp); 7072248Sraf } else 7082248Sraf reqpp = &reqp->req_next; 7092248Sraf } 7102248Sraf if (cancelall) { 7112248Sraf ASSERT(_aio_donecnt == 0); 7122248Sraf _aio_done_head = NULL; 7132248Sraf } 7142248Sraf sig_mutex_unlock(&__aio_mutex); 7152248Sraf 7162248Sraf if (canceled && done == 0) 7172248Sraf return (AIO_CANCELED); 7182248Sraf else if (done && canceled == 0) 7192248Sraf return (AIO_ALLDONE); 7202248Sraf else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) 7212248Sraf return ((int)_kaio(AIOCANCEL, fd, NULL)); 7222248Sraf return (AIO_NOTCANCELED); 7232248Sraf } 7242248Sraf 7252248Sraf /* 7262248Sraf * Cancel requests from a given work queue. If the file descriptor 7272248Sraf * parameter, fd, is non-negative, then only cancel those requests 7282248Sraf * in this queue that are to this file descriptor. If the fd 7292248Sraf * parameter is -1, then cancel all requests. 7302248Sraf */ 7312248Sraf static void 7322248Sraf _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) 7332248Sraf { 7342248Sraf aio_req_t *reqp; 7352248Sraf 7362248Sraf sig_mutex_lock(&aiowp->work_qlock1); 7372248Sraf /* 7382248Sraf * cancel queued requests first. 7392248Sraf */ 7402248Sraf reqp = aiowp->work_tail1; 7412248Sraf while (reqp != NULL) { 7422248Sraf if (fd < 0 || reqp->req_args.fd == fd) { 7432248Sraf if (_aio_cancel_req(aiowp, reqp, canceled, done)) { 7442248Sraf /* 7452248Sraf * Callers locks were dropped. 7462248Sraf * reqp is invalid; start traversing 7472248Sraf * the list from the beginning again. 7482248Sraf */ 7492248Sraf reqp = aiowp->work_tail1; 7502248Sraf continue; 7512248Sraf } 7522248Sraf } 7532248Sraf reqp = reqp->req_next; 7542248Sraf } 7552248Sraf /* 7562248Sraf * Since the queued requests have been canceled, there can 7572248Sraf * only be one inprogress request that should be canceled. 7582248Sraf */ 7592248Sraf if ((reqp = aiowp->work_req) != NULL && 7602248Sraf (fd < 0 || reqp->req_args.fd == fd)) 7612248Sraf (void) _aio_cancel_req(aiowp, reqp, canceled, done); 7622248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 7632248Sraf } 7642248Sraf 7652248Sraf /* 7662248Sraf * Cancel a request. Return 1 if the callers locks were temporarily 7672248Sraf * dropped, otherwise return 0. 7682248Sraf */ 7692248Sraf int 7702248Sraf _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) 7712248Sraf { 7722248Sraf int ostate = reqp->req_state; 7732248Sraf 7742248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 7752248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 7762248Sraf if (ostate == AIO_REQ_CANCELED) 7772248Sraf return (0); 7782248Sraf if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { 7792248Sraf (*done)++; 7802248Sraf return (0); 7812248Sraf } 7822248Sraf if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { 7832248Sraf ASSERT(POSIX_AIO(reqp)); 7842248Sraf /* Cancel the queued aio_fsync() request */ 7852248Sraf if (!reqp->req_head->lio_canned) { 7862248Sraf reqp->req_head->lio_canned = 1; 7872248Sraf _aio_outstand_cnt--; 7882248Sraf (*canceled)++; 7892248Sraf } 7902248Sraf return (0); 7912248Sraf } 7922248Sraf reqp->req_state = AIO_REQ_CANCELED; 7932248Sraf _aio_req_del(aiowp, reqp, ostate); 7942248Sraf (void) _aio_hash_del(reqp->req_resultp); 7952248Sraf (*canceled)++; 7962248Sraf if (reqp == aiowp->work_req) { 7972248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS); 7982248Sraf /* 7992248Sraf * Set the result values now, before _aiodone() is called. 8002248Sraf * We do this because the application can expect aio_return 8012248Sraf * and aio_errno to be set to -1 and ECANCELED, respectively, 8022248Sraf * immediately after a successful return from aiocancel() 8032248Sraf * or aio_cancel(). 8042248Sraf */ 8052248Sraf _aio_set_result(reqp, -1, ECANCELED); 8062248Sraf (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); 8072248Sraf return (0); 8082248Sraf } 8092248Sraf if (!POSIX_AIO(reqp)) { 8102248Sraf _aio_outstand_cnt--; 8112248Sraf _aio_set_result(reqp, -1, ECANCELED); 8122248Sraf return (0); 8132248Sraf } 8142248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 8152248Sraf sig_mutex_unlock(&__aio_mutex); 8162248Sraf _aiodone(reqp, -1, ECANCELED); 8172248Sraf sig_mutex_lock(&__aio_mutex); 8182248Sraf sig_mutex_lock(&aiowp->work_qlock1); 8192248Sraf return (1); 8202248Sraf } 8212248Sraf 8222248Sraf int 8232248Sraf _aio_create_worker(aio_req_t *reqp, int mode) 8242248Sraf { 8252248Sraf aio_worker_t *aiowp, **workers, **nextworker; 8262248Sraf int *aio_workerscnt; 8272248Sraf void *(*func)(void *); 8282248Sraf sigset_t oset; 8292248Sraf int error; 8302248Sraf 8312248Sraf /* 8322248Sraf * Put the new worker thread in the right queue. 8332248Sraf */ 8342248Sraf switch (mode) { 8352248Sraf case AIOREAD: 8362248Sraf case AIOWRITE: 8372248Sraf case AIOAREAD: 8382248Sraf case AIOAWRITE: 8392248Sraf #if !defined(_LP64) 8402248Sraf case AIOAREAD64: 8412248Sraf case AIOAWRITE64: 8422248Sraf #endif 8432248Sraf workers = &__workers_rw; 8442248Sraf nextworker = &__nextworker_rw; 8452248Sraf aio_workerscnt = &__rw_workerscnt; 8462248Sraf func = _aio_do_request; 8472248Sraf break; 8482248Sraf case AIONOTIFY: 8492248Sraf workers = &__workers_no; 8502248Sraf nextworker = &__nextworker_no; 8512248Sraf func = _aio_do_notify; 8522248Sraf aio_workerscnt = &__no_workerscnt; 8532248Sraf break; 8542248Sraf default: 8552248Sraf aio_panic("_aio_create_worker: invalid mode"); 8562248Sraf break; 8572248Sraf } 8582248Sraf 8592248Sraf if ((aiowp = _aio_worker_alloc()) == NULL) 8602248Sraf return (-1); 8612248Sraf 8622248Sraf if (reqp) { 8632248Sraf reqp->req_state = AIO_REQ_QUEUED; 8642248Sraf reqp->req_worker = aiowp; 8652248Sraf aiowp->work_head1 = reqp; 8662248Sraf aiowp->work_tail1 = reqp; 8672248Sraf aiowp->work_next1 = reqp; 8682248Sraf aiowp->work_count1 = 1; 8692248Sraf aiowp->work_minload1 = 1; 8702248Sraf } 8712248Sraf 8722248Sraf (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); 8732248Sraf error = thr_create(NULL, AIOSTKSIZE, func, aiowp, 8742248Sraf THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); 8752248Sraf (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); 8762248Sraf if (error) { 8772248Sraf if (reqp) { 8782248Sraf reqp->req_state = 0; 8792248Sraf reqp->req_worker = NULL; 8802248Sraf } 8812248Sraf _aio_worker_free(aiowp); 8822248Sraf return (-1); 8832248Sraf } 8842248Sraf 8852248Sraf lmutex_lock(&__aio_mutex); 8862248Sraf (*aio_workerscnt)++; 8872248Sraf if (*workers == NULL) { 8882248Sraf aiowp->work_forw = aiowp; 8892248Sraf aiowp->work_backw = aiowp; 8902248Sraf *nextworker = aiowp; 8912248Sraf *workers = aiowp; 8922248Sraf } else { 8932248Sraf aiowp->work_backw = (*workers)->work_backw; 8942248Sraf aiowp->work_forw = (*workers); 8952248Sraf (*workers)->work_backw->work_forw = aiowp; 8962248Sraf (*workers)->work_backw = aiowp; 8972248Sraf } 8982248Sraf _aio_worker_cnt++; 8992248Sraf lmutex_unlock(&__aio_mutex); 9002248Sraf 9012248Sraf (void) thr_continue(aiowp->work_tid); 9022248Sraf 9032248Sraf return (0); 9042248Sraf } 9052248Sraf 9062248Sraf /* 9072248Sraf * This is the worker's main routine. 9082248Sraf * The task of this function is to execute all queued requests; 9092248Sraf * once the last pending request is executed this function will block 9102248Sraf * in _aio_idle(). A new incoming request must wakeup this thread to 9112248Sraf * restart the work. 9122248Sraf * Every worker has an own work queue. The queue lock is required 9132248Sraf * to synchronize the addition of new requests for this worker or 9142248Sraf * cancellation of pending/running requests. 9152248Sraf * 9162248Sraf * Cancellation scenarios: 9172248Sraf * The cancellation of a request is being done asynchronously using 9182248Sraf * _aio_cancel_req() from another thread context. 9192248Sraf * A queued request can be cancelled in different manners : 9202248Sraf * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): 9212248Sraf * - lock the queue -> remove the request -> unlock the queue 9222248Sraf * - this function/thread does not detect this cancellation process 9232248Sraf * b) request is in progress (AIO_REQ_INPROGRESS) : 9242248Sraf * - this function first allow the cancellation of the running 9252248Sraf * request with the flag "work_cancel_flg=1" 9262248Sraf * see _aio_req_get() -> _aio_cancel_on() 9272248Sraf * During this phase, it is allowed to interrupt the worker 9282248Sraf * thread running the request (this thread) using the SIGAIOCANCEL 9292248Sraf * signal. 9302248Sraf * Once this thread returns from the kernel (because the request 9312248Sraf * is just done), then it must disable a possible cancellation 9322248Sraf * and proceed to finish the request. To disable the cancellation 9332248Sraf * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". 9342248Sraf * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): 9352248Sraf * same procedure as in a) 9362248Sraf * 9372248Sraf * To b) 9382248Sraf * This thread uses sigsetjmp() to define the position in the code, where 9392248Sraf * it wish to continue working in the case that a SIGAIOCANCEL signal 9402248Sraf * is detected. 9412248Sraf * Normally this thread should get the cancellation signal during the 9422248Sraf * kernel phase (reading or writing). In that case the signal handler 9432248Sraf * aiosigcancelhndlr() is activated using the worker thread context, 9442248Sraf * which again will use the siglongjmp() function to break the standard 9452248Sraf * code flow and jump to the "sigsetjmp" position, provided that 9462248Sraf * "work_cancel_flg" is set to "1". 9472248Sraf * Because the "work_cancel_flg" is only manipulated by this worker 9482248Sraf * thread and it can only run on one CPU at a given time, it is not 9492248Sraf * necessary to protect that flag with the queue lock. 9502248Sraf * Returning from the kernel (read or write system call) we must 9512248Sraf * first disable the use of the SIGAIOCANCEL signal and accordingly 9522248Sraf * the use of the siglongjmp() function to prevent a possible deadlock: 9532248Sraf * - It can happens that this worker thread returns from the kernel and 9542248Sraf * blocks in "work_qlock1", 9552248Sraf * - then a second thread cancels the apparently "in progress" request 9562248Sraf * and sends the SIGAIOCANCEL signal to the worker thread, 9572248Sraf * - the worker thread gets assigned the "work_qlock1" and will returns 9582248Sraf * from the kernel, 9592248Sraf * - the kernel detects the pending signal and activates the signal 9602248Sraf * handler instead, 9612248Sraf * - if the "work_cancel_flg" is still set then the signal handler 9622248Sraf * should use siglongjmp() to cancel the "in progress" request and 9632248Sraf * it would try to acquire the same work_qlock1 in _aio_req_get() 9642248Sraf * for a second time => deadlock. 9652248Sraf * To avoid that situation we disable the cancellation of the request 9662248Sraf * in progress BEFORE we try to acquire the work_qlock1. 9672248Sraf * In that case the signal handler will not call siglongjmp() and the 9682248Sraf * worker thread will continue running the standard code flow. 9692248Sraf * Then this thread must check the AIO_REQ_CANCELED flag to emulate 9702248Sraf * an eventually required siglongjmp() freeing the work_qlock1 and 9712248Sraf * avoiding a deadlock. 9722248Sraf */ 9732248Sraf void * 9742248Sraf _aio_do_request(void *arglist) 9752248Sraf { 9762248Sraf aio_worker_t *aiowp = (aio_worker_t *)arglist; 9772248Sraf ulwp_t *self = curthread; 9782248Sraf struct aio_args *arg; 9792248Sraf aio_req_t *reqp; /* current AIO request */ 9802248Sraf ssize_t retval; 9812248Sraf int error; 9822248Sraf 9832248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 9842248Sraf aio_panic("_aio_do_request, pthread_setspecific()"); 9852248Sraf (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); 9862248Sraf ASSERT(aiowp->work_req == NULL); 9872248Sraf 9882248Sraf /* 9892248Sraf * We resume here when an operation is cancelled. 9902248Sraf * On first entry, aiowp->work_req == NULL, so all 9912248Sraf * we do is block SIGAIOCANCEL. 9922248Sraf */ 9932248Sraf (void) sigsetjmp(aiowp->work_jmp_buf, 0); 9942248Sraf ASSERT(self->ul_sigdefer == 0); 9952248Sraf 9962248Sraf sigoff(self); /* block SIGAIOCANCEL */ 9972248Sraf if (aiowp->work_req != NULL) 9982248Sraf _aio_finish_request(aiowp, -1, ECANCELED); 9992248Sraf 10002248Sraf for (;;) { 10012248Sraf /* 10022248Sraf * Put completed requests on aio_done_list. This has 10032248Sraf * to be done as part of the main loop to ensure that 10042248Sraf * we don't artificially starve any aiowait'ers. 10052248Sraf */ 10062248Sraf if (aiowp->work_done1) 10072248Sraf _aio_work_done(aiowp); 10082248Sraf 10092248Sraf top: 10102248Sraf /* consume any deferred SIGAIOCANCEL signal here */ 10112248Sraf sigon(self); 10122248Sraf sigoff(self); 10132248Sraf 10142248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 10152248Sraf if (_aio_idle(aiowp) != 0) 10162248Sraf goto top; 10172248Sraf } 10182248Sraf arg = &reqp->req_args; 10192248Sraf ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || 10202248Sraf reqp->req_state == AIO_REQ_CANCELED); 10212248Sraf error = 0; 10222248Sraf 10232248Sraf switch (reqp->req_op) { 10242248Sraf case AIOREAD: 10252248Sraf case AIOAREAD: 10262248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10272248Sraf retval = pread(arg->fd, arg->buf, 10282248Sraf arg->bufsz, arg->offset); 10292248Sraf if (retval == -1) { 10302248Sraf if (errno == ESPIPE) { 10312248Sraf retval = read(arg->fd, 10322248Sraf arg->buf, arg->bufsz); 10332248Sraf if (retval == -1) 10342248Sraf error = errno; 10352248Sraf } else { 10362248Sraf error = errno; 10372248Sraf } 10382248Sraf } 10392248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10402248Sraf break; 10412248Sraf case AIOWRITE: 10422248Sraf case AIOAWRITE: 10432248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10442248Sraf retval = pwrite(arg->fd, arg->buf, 10452248Sraf arg->bufsz, arg->offset); 10462248Sraf if (retval == -1) { 10472248Sraf if (errno == ESPIPE) { 10482248Sraf retval = write(arg->fd, 10492248Sraf arg->buf, arg->bufsz); 10502248Sraf if (retval == -1) 10512248Sraf error = errno; 10522248Sraf } else { 10532248Sraf error = errno; 10542248Sraf } 10552248Sraf } 10562248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10572248Sraf break; 10582248Sraf #if !defined(_LP64) 10592248Sraf case AIOAREAD64: 10602248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10612248Sraf retval = pread64(arg->fd, arg->buf, 10622248Sraf arg->bufsz, arg->offset); 10632248Sraf if (retval == -1) { 10642248Sraf if (errno == ESPIPE) { 10652248Sraf retval = read(arg->fd, 10662248Sraf arg->buf, arg->bufsz); 10672248Sraf if (retval == -1) 10682248Sraf error = errno; 10692248Sraf } else { 10702248Sraf error = errno; 10712248Sraf } 10722248Sraf } 10732248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10742248Sraf break; 10752248Sraf case AIOAWRITE64: 10762248Sraf sigon(self); /* unblock SIGAIOCANCEL */ 10772248Sraf retval = pwrite64(arg->fd, arg->buf, 10782248Sraf arg->bufsz, arg->offset); 10792248Sraf if (retval == -1) { 10802248Sraf if (errno == ESPIPE) { 10812248Sraf retval = write(arg->fd, 10822248Sraf arg->buf, arg->bufsz); 10832248Sraf if (retval == -1) 10842248Sraf error = errno; 10852248Sraf } else { 10862248Sraf error = errno; 10872248Sraf } 10882248Sraf } 10892248Sraf sigoff(self); /* block SIGAIOCANCEL */ 10902248Sraf break; 10912248Sraf #endif /* !defined(_LP64) */ 10922248Sraf case AIOFSYNC: 10932248Sraf if (_aio_fsync_del(aiowp, reqp)) 10942248Sraf goto top; 10952248Sraf ASSERT(reqp->req_head == NULL); 10962248Sraf /* 10972248Sraf * All writes for this fsync request are now 10982248Sraf * acknowledged. Now make these writes visible 10992248Sraf * and put the final request into the hash table. 11002248Sraf */ 11012248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11022248Sraf /* EMPTY */; 11032248Sraf } else if (arg->offset == O_SYNC) { 11042248Sraf if ((retval = __fdsync(arg->fd, FSYNC)) == -1) 11052248Sraf error = errno; 11062248Sraf } else { 11072248Sraf if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) 11082248Sraf error = errno; 11092248Sraf } 11102248Sraf if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) 11112248Sraf aio_panic("_aio_do_request(): AIOFSYNC: " 11122248Sraf "request already in hash table"); 11132248Sraf break; 11142248Sraf default: 11152248Sraf aio_panic("_aio_do_request, bad op"); 11162248Sraf } 11172248Sraf 11182248Sraf _aio_finish_request(aiowp, retval, error); 11192248Sraf } 11202248Sraf /* NOTREACHED */ 11212248Sraf return (NULL); 11222248Sraf } 11232248Sraf 11242248Sraf /* 11252248Sraf * Perform the tail processing for _aio_do_request(). 11262248Sraf * The in-progress request may or may not have been cancelled. 11272248Sraf */ 11282248Sraf static void 11292248Sraf _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) 11302248Sraf { 11312248Sraf aio_req_t *reqp; 11322248Sraf 11332248Sraf sig_mutex_lock(&aiowp->work_qlock1); 11342248Sraf if ((reqp = aiowp->work_req) == NULL) 11352248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11362248Sraf else { 11372248Sraf aiowp->work_req = NULL; 11382248Sraf if (reqp->req_state == AIO_REQ_CANCELED) { 11392248Sraf retval = -1; 11402248Sraf error = ECANCELED; 11412248Sraf } 11422248Sraf if (!POSIX_AIO(reqp)) { 11432248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11442248Sraf sig_mutex_lock(&__aio_mutex); 11452248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11462248Sraf reqp->req_state = AIO_REQ_DONE; 11472248Sraf _aio_req_done_cnt++; 11482248Sraf _aio_set_result(reqp, retval, error); 11492248Sraf if (error == ECANCELED) 11502248Sraf _aio_outstand_cnt--; 11512248Sraf sig_mutex_unlock(&__aio_mutex); 11522248Sraf } else { 11532248Sraf if (reqp->req_state == AIO_REQ_INPROGRESS) 11542248Sraf reqp->req_state = AIO_REQ_DONE; 11552248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 11562248Sraf _aiodone(reqp, retval, error); 11572248Sraf } 11582248Sraf } 11592248Sraf } 11602248Sraf 11612248Sraf void 11622248Sraf _aio_req_mark_done(aio_req_t *reqp) 11632248Sraf { 11642248Sraf #if !defined(_LP64) 11652248Sraf if (reqp->req_largefile) 11662248Sraf ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11672248Sraf else 11682248Sraf #endif 11692248Sraf ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; 11702248Sraf } 11712248Sraf 11722248Sraf /* 11732248Sraf * Sleep for 'ticks' clock ticks to give somebody else a chance to run, 11742248Sraf * hopefully to consume one of our queued signals. 11752248Sraf */ 11762248Sraf static void 11772248Sraf _aio_delay(int ticks) 11782248Sraf { 11792248Sraf (void) usleep(ticks * (MICROSEC / hz)); 11802248Sraf } 11812248Sraf 11822248Sraf /* 11832248Sraf * Actually send the notifications. 11842248Sraf * We could block indefinitely here if the application 11852248Sraf * is not listening for the signal or port notifications. 11862248Sraf */ 11872248Sraf static void 11882248Sraf send_notification(notif_param_t *npp) 11892248Sraf { 11902248Sraf extern int __sigqueue(pid_t pid, int signo, 11912248Sraf /* const union sigval */ void *value, int si_code, int block); 11922248Sraf 11932248Sraf if (npp->np_signo) 11942248Sraf (void) __sigqueue(__pid, npp->np_signo, npp->np_user, 11952248Sraf SI_ASYNCIO, 1); 11962248Sraf else if (npp->np_port >= 0) 11972248Sraf (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, 11982248Sraf npp->np_event, npp->np_object, npp->np_user); 11992248Sraf 12002248Sraf if (npp->np_lio_signo) 12012248Sraf (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, 12022248Sraf SI_ASYNCIO, 1); 12032248Sraf else if (npp->np_lio_port >= 0) 12042248Sraf (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, 12052248Sraf npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); 12062248Sraf } 12072248Sraf 12082248Sraf /* 12092248Sraf * Asynchronous notification worker. 12102248Sraf */ 12112248Sraf void * 12122248Sraf _aio_do_notify(void *arg) 12132248Sraf { 12142248Sraf aio_worker_t *aiowp = (aio_worker_t *)arg; 12152248Sraf aio_req_t *reqp; 12162248Sraf 12172248Sraf /* 12182248Sraf * This isn't really necessary. All signals are blocked. 12192248Sraf */ 12202248Sraf if (pthread_setspecific(_aio_key, aiowp) != 0) 12212248Sraf aio_panic("_aio_do_notify, pthread_setspecific()"); 12222248Sraf 12232248Sraf /* 12242248Sraf * Notifications are never cancelled. 12252248Sraf * All signals remain blocked, forever. 12262248Sraf */ 12272248Sraf for (;;) { 12282248Sraf while ((reqp = _aio_req_get(aiowp)) == NULL) { 12292248Sraf if (_aio_idle(aiowp) != 0) 12302248Sraf aio_panic("_aio_do_notify: _aio_idle() failed"); 12312248Sraf } 12322248Sraf send_notification(&reqp->req_notify); 12332248Sraf _aio_req_free(reqp); 12342248Sraf } 12352248Sraf 12362248Sraf /* NOTREACHED */ 12372248Sraf return (NULL); 12382248Sraf } 12392248Sraf 12402248Sraf /* 12412248Sraf * Do the completion semantics for a request that was either canceled 12422248Sraf * by _aio_cancel_req() or was completed by _aio_do_request(). 12432248Sraf */ 12442248Sraf static void 12452248Sraf _aiodone(aio_req_t *reqp, ssize_t retval, int error) 12462248Sraf { 12472248Sraf aio_result_t *resultp = reqp->req_resultp; 12482248Sraf int notify = 0; 12492248Sraf aio_lio_t *head; 12502248Sraf int sigev_none; 12512248Sraf int sigev_signal; 12522248Sraf int sigev_thread; 12532248Sraf int sigev_port; 12542248Sraf notif_param_t np; 12552248Sraf 12562248Sraf /* 12572248Sraf * We call _aiodone() only for Posix I/O. 12582248Sraf */ 12592248Sraf ASSERT(POSIX_AIO(reqp)); 12602248Sraf 12612248Sraf sigev_none = 0; 12622248Sraf sigev_signal = 0; 12632248Sraf sigev_thread = 0; 12642248Sraf sigev_port = 0; 12652248Sraf np.np_signo = 0; 12662248Sraf np.np_port = -1; 12672248Sraf np.np_lio_signo = 0; 12682248Sraf np.np_lio_port = -1; 12692248Sraf 12702248Sraf switch (reqp->req_sigevent.sigev_notify) { 12712248Sraf case SIGEV_NONE: 12722248Sraf sigev_none = 1; 12732248Sraf break; 12742248Sraf case SIGEV_SIGNAL: 12752248Sraf sigev_signal = 1; 12762248Sraf break; 12772248Sraf case SIGEV_THREAD: 12782248Sraf sigev_thread = 1; 12792248Sraf break; 12802248Sraf case SIGEV_PORT: 12812248Sraf sigev_port = 1; 12822248Sraf break; 12832248Sraf default: 12842248Sraf aio_panic("_aiodone: improper sigev_notify"); 12852248Sraf break; 12862248Sraf } 12872248Sraf 12882248Sraf /* 12892248Sraf * Figure out the notification parameters while holding __aio_mutex. 12902248Sraf * Actually perform the notifications after dropping __aio_mutex. 12912248Sraf * This allows us to sleep for a long time (if the notifications 12922248Sraf * incur delays) without impeding other async I/O operations. 12932248Sraf */ 12942248Sraf 12952248Sraf sig_mutex_lock(&__aio_mutex); 12962248Sraf 12972248Sraf if (sigev_signal) { 12982248Sraf if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) 12992248Sraf notify = 1; 13002248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13012248Sraf } else if (sigev_thread | sigev_port) { 13022248Sraf if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) 13032248Sraf notify = 1; 13042248Sraf np.np_event = reqp->req_op; 13052248Sraf if (np.np_event == AIOFSYNC && reqp->req_largefile) 13062248Sraf np.np_event = AIOFSYNC64; 13072248Sraf np.np_object = (uintptr_t)reqp->req_aiocbp; 13082248Sraf np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; 13092248Sraf } 13102248Sraf 13112248Sraf if (resultp->aio_errno == EINPROGRESS) 13122248Sraf _aio_set_result(reqp, retval, error); 13132248Sraf 13142248Sraf _aio_outstand_cnt--; 13152248Sraf 13162248Sraf head = reqp->req_head; 13172248Sraf reqp->req_head = NULL; 13182248Sraf 13192248Sraf if (sigev_none) { 13202248Sraf _aio_enq_doneq(reqp); 13212248Sraf reqp = NULL; 13222248Sraf } else { 13232248Sraf (void) _aio_hash_del(resultp); 13242248Sraf _aio_req_mark_done(reqp); 13252248Sraf } 13262248Sraf 13272248Sraf _aio_waitn_wakeup(); 13282248Sraf 13292248Sraf /* 13302248Sraf * __aio_waitn() sets AIO_WAIT_INPROGRESS and 13312248Sraf * __aio_suspend() increments "_aio_kernel_suspend" 13322248Sraf * when they are waiting in the kernel for completed I/Os. 13332248Sraf * 13342248Sraf * _kaio(AIONOTIFY) awakes the corresponding function 13352248Sraf * in the kernel; then the corresponding __aio_waitn() or 13362248Sraf * __aio_suspend() function could reap the recently 13372248Sraf * completed I/Os (_aiodone()). 13382248Sraf */ 13392248Sraf if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) 13402248Sraf (void) _kaio(AIONOTIFY); 13412248Sraf 13422248Sraf sig_mutex_unlock(&__aio_mutex); 13432248Sraf 13442248Sraf if (head != NULL) { 13452248Sraf /* 13462248Sraf * If all the lio requests have completed, 13472248Sraf * prepare to notify the waiting thread. 13482248Sraf */ 13492248Sraf sig_mutex_lock(&head->lio_mutex); 13502248Sraf ASSERT(head->lio_refcnt == head->lio_nent); 13512248Sraf if (head->lio_refcnt == 1) { 13522248Sraf int waiting = 0; 13532248Sraf if (head->lio_mode == LIO_WAIT) { 13542248Sraf if ((waiting = head->lio_waiting) != 0) 13552248Sraf (void) cond_signal(&head->lio_cond_cv); 13562248Sraf } else if (head->lio_port < 0) { /* none or signal */ 13572248Sraf if ((np.np_lio_signo = head->lio_signo) != 0) 13582248Sraf notify = 1; 13592248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13602248Sraf } else { /* thread or port */ 13612248Sraf notify = 1; 13622248Sraf np.np_lio_port = head->lio_port; 13632248Sraf np.np_lio_event = head->lio_event; 13642248Sraf np.np_lio_object = 13652248Sraf (uintptr_t)head->lio_sigevent; 13662248Sraf np.np_lio_user = head->lio_sigval.sival_ptr; 13672248Sraf } 13682248Sraf head->lio_nent = head->lio_refcnt = 0; 13692248Sraf sig_mutex_unlock(&head->lio_mutex); 13702248Sraf if (waiting == 0) 13712248Sraf _aio_lio_free(head); 13722248Sraf } else { 13732248Sraf head->lio_nent--; 13742248Sraf head->lio_refcnt--; 13752248Sraf sig_mutex_unlock(&head->lio_mutex); 13762248Sraf } 13772248Sraf } 13782248Sraf 13792248Sraf /* 13802248Sraf * The request is completed; now perform the notifications. 13812248Sraf */ 13822248Sraf if (notify) { 13832248Sraf if (reqp != NULL) { 13842248Sraf /* 13852248Sraf * We usually put the request on the notification 13862248Sraf * queue because we don't want to block and delay 13872248Sraf * other operations behind us in the work queue. 13882248Sraf * Also we must never block on a cancel notification 13892248Sraf * because we are being called from an application 13902248Sraf * thread in this case and that could lead to deadlock 13912248Sraf * if no other thread is receiving notificatins. 13922248Sraf */ 13932248Sraf reqp->req_notify = np; 13942248Sraf reqp->req_op = AIONOTIFY; 13952248Sraf _aio_req_add(reqp, &__workers_no, AIONOTIFY); 13962248Sraf reqp = NULL; 13972248Sraf } else { 13982248Sraf /* 13992248Sraf * We already put the request on the done queue, 14002248Sraf * so we can't queue it to the notification queue. 14012248Sraf * Just do the notification directly. 14022248Sraf */ 14032248Sraf send_notification(&np); 14042248Sraf } 14052248Sraf } 14062248Sraf 14072248Sraf if (reqp != NULL) 14082248Sraf _aio_req_free(reqp); 14092248Sraf } 14102248Sraf 14112248Sraf /* 14122248Sraf * Delete fsync requests from list head until there is 14132248Sraf * only one left. Return 0 when there is only one, 14142248Sraf * otherwise return a non-zero value. 14152248Sraf */ 14162248Sraf static int 14172248Sraf _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) 14182248Sraf { 14192248Sraf aio_lio_t *head = reqp->req_head; 14202248Sraf int rval = 0; 14212248Sraf 14222248Sraf ASSERT(reqp == aiowp->work_req); 14232248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14242248Sraf sig_mutex_lock(&head->lio_mutex); 14252248Sraf if (head->lio_refcnt > 1) { 14262248Sraf head->lio_refcnt--; 14272248Sraf head->lio_nent--; 14282248Sraf aiowp->work_req = NULL; 14292248Sraf sig_mutex_unlock(&head->lio_mutex); 14302248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14312248Sraf sig_mutex_lock(&__aio_mutex); 14322248Sraf _aio_outstand_cnt--; 14332248Sraf _aio_waitn_wakeup(); 14342248Sraf sig_mutex_unlock(&__aio_mutex); 14352248Sraf _aio_req_free(reqp); 14362248Sraf return (1); 14372248Sraf } 14382248Sraf ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); 14392248Sraf reqp->req_head = NULL; 14402248Sraf if (head->lio_canned) 14412248Sraf reqp->req_state = AIO_REQ_CANCELED; 14422248Sraf if (head->lio_mode == LIO_DESTROY) { 14432248Sraf aiowp->work_req = NULL; 14442248Sraf rval = 1; 14452248Sraf } 14462248Sraf sig_mutex_unlock(&head->lio_mutex); 14472248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14482248Sraf head->lio_refcnt--; 14492248Sraf head->lio_nent--; 14502248Sraf _aio_lio_free(head); 14512248Sraf if (rval != 0) 14522248Sraf _aio_req_free(reqp); 14532248Sraf return (rval); 14542248Sraf } 14552248Sraf 14562248Sraf /* 14572248Sraf * A worker is set idle when its work queue is empty. 14582248Sraf * The worker checks again that it has no more work 14592248Sraf * and then goes to sleep waiting for more work. 14602248Sraf */ 14612248Sraf int 14622248Sraf _aio_idle(aio_worker_t *aiowp) 14632248Sraf { 14642248Sraf int error = 0; 14652248Sraf 14662248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14672248Sraf if (aiowp->work_count1 == 0) { 14682248Sraf ASSERT(aiowp->work_minload1 == 0); 14692248Sraf aiowp->work_idleflg = 1; 14702248Sraf /* 14712248Sraf * A cancellation handler is not needed here. 14722248Sraf * aio worker threads are never cancelled via pthread_cancel(). 14732248Sraf */ 14742248Sraf error = sig_cond_wait(&aiowp->work_idle_cv, 14752248Sraf &aiowp->work_qlock1); 14762248Sraf /* 14772248Sraf * The idle flag is normally cleared before worker is awakened 14782248Sraf * by aio_req_add(). On error (EINTR), we clear it ourself. 14792248Sraf */ 14802248Sraf if (error) 14812248Sraf aiowp->work_idleflg = 0; 14822248Sraf } 14832248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 14842248Sraf return (error); 14852248Sraf } 14862248Sraf 14872248Sraf /* 14882248Sraf * A worker's completed AIO requests are placed onto a global 14892248Sraf * done queue. The application is only sent a SIGIO signal if 14902248Sraf * the process has a handler enabled and it is not waiting via 14912248Sraf * aiowait(). 14922248Sraf */ 14932248Sraf static void 14942248Sraf _aio_work_done(aio_worker_t *aiowp) 14952248Sraf { 14962248Sraf aio_req_t *reqp; 14972248Sraf 14982248Sraf sig_mutex_lock(&aiowp->work_qlock1); 14992248Sraf reqp = aiowp->work_prev1; 15002248Sraf reqp->req_next = NULL; 15012248Sraf aiowp->work_done1 = 0; 15022248Sraf aiowp->work_tail1 = aiowp->work_next1; 15032248Sraf if (aiowp->work_tail1 == NULL) 15042248Sraf aiowp->work_head1 = NULL; 15052248Sraf aiowp->work_prev1 = NULL; 15062248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 15072248Sraf sig_mutex_lock(&__aio_mutex); 15082248Sraf _aio_donecnt++; 15092248Sraf _aio_outstand_cnt--; 15102248Sraf _aio_req_done_cnt--; 15112248Sraf ASSERT(_aio_donecnt > 0 && 15122248Sraf _aio_outstand_cnt >= 0 && 15132248Sraf _aio_req_done_cnt >= 0); 15142248Sraf ASSERT(reqp != NULL); 15152248Sraf 15162248Sraf if (_aio_done_tail == NULL) { 15172248Sraf _aio_done_head = _aio_done_tail = reqp; 15182248Sraf } else { 15192248Sraf _aio_done_head->req_next = reqp; 15202248Sraf _aio_done_head = reqp; 15212248Sraf } 15222248Sraf 15232248Sraf if (_aiowait_flag) { 15242248Sraf sig_mutex_unlock(&__aio_mutex); 15252248Sraf (void) _kaio(AIONOTIFY); 15262248Sraf } else { 15272248Sraf sig_mutex_unlock(&__aio_mutex); 15282248Sraf if (_sigio_enabled) 15292248Sraf (void) kill(__pid, SIGIO); 15302248Sraf } 15312248Sraf } 15322248Sraf 15332248Sraf /* 15342248Sraf * The done queue consists of AIO requests that are in either the 15352248Sraf * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled 15362248Sraf * are discarded. If the done queue is empty then NULL is returned. 15372248Sraf * Otherwise the address of a done aio_result_t is returned. 15382248Sraf */ 15392248Sraf aio_result_t * 15402248Sraf _aio_req_done(void) 15412248Sraf { 15422248Sraf aio_req_t *reqp; 15432248Sraf aio_result_t *resultp; 15442248Sraf 15452248Sraf ASSERT(MUTEX_HELD(&__aio_mutex)); 15462248Sraf 15472248Sraf if ((reqp = _aio_done_tail) != NULL) { 15482248Sraf if ((_aio_done_tail = reqp->req_next) == NULL) 15492248Sraf _aio_done_head = NULL; 15502248Sraf ASSERT(_aio_donecnt > 0); 15512248Sraf _aio_donecnt--; 15522248Sraf (void) _aio_hash_del(reqp->req_resultp); 15532248Sraf resultp = reqp->req_resultp; 15542248Sraf ASSERT(reqp->req_state == AIO_REQ_DONE); 15552248Sraf _aio_req_free(reqp); 15562248Sraf return (resultp); 15572248Sraf } 15582248Sraf /* is queue empty? */ 15592248Sraf if (reqp == NULL && _aio_outstand_cnt == 0) { 15602248Sraf return ((aio_result_t *)-1); 15612248Sraf } 15622248Sraf return (NULL); 15632248Sraf } 15642248Sraf 15652248Sraf /* 15662248Sraf * Set the return and errno values for the application's use. 15672248Sraf * 15682248Sraf * For the Posix interfaces, we must set the return value first followed 15692248Sraf * by the errno value because the Posix interfaces allow for a change 15702248Sraf * in the errno value from EINPROGRESS to something else to signal 15712248Sraf * the completion of the asynchronous request. 15722248Sraf * 15732248Sraf * The opposite is true for the Solaris interfaces. These allow for 15742248Sraf * a change in the return value from AIO_INPROGRESS to something else 15752248Sraf * to signal the completion of the asynchronous request. 15762248Sraf */ 15772248Sraf void 15782248Sraf _aio_set_result(aio_req_t *reqp, ssize_t retval, int error) 15792248Sraf { 15802248Sraf aio_result_t *resultp = reqp->req_resultp; 15812248Sraf 15822248Sraf if (POSIX_AIO(reqp)) { 15832248Sraf resultp->aio_return = retval; 15842248Sraf membar_producer(); 15852248Sraf resultp->aio_errno = error; 15862248Sraf } else { 15872248Sraf resultp->aio_errno = error; 15882248Sraf membar_producer(); 15892248Sraf resultp->aio_return = retval; 15902248Sraf } 15912248Sraf } 15922248Sraf 15932248Sraf /* 15942248Sraf * Add an AIO request onto the next work queue. 15952248Sraf * A circular list of workers is used to choose the next worker. 15962248Sraf */ 15972248Sraf void 15982248Sraf _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) 15992248Sraf { 16002248Sraf ulwp_t *self = curthread; 16012248Sraf aio_worker_t *aiowp; 16022248Sraf aio_worker_t *first; 16032248Sraf int load_bal_flg = 1; 16042248Sraf int found; 16052248Sraf 16062248Sraf ASSERT(reqp->req_state != AIO_REQ_DONEQ); 16072248Sraf reqp->req_next = NULL; 16082248Sraf /* 16092248Sraf * Try to acquire the next worker's work queue. If it is locked, 16102248Sraf * then search the list of workers until a queue is found unlocked, 16112248Sraf * or until the list is completely traversed at which point another 16122248Sraf * worker will be created. 16132248Sraf */ 16142248Sraf sigoff(self); /* defer SIGIO */ 16152248Sraf sig_mutex_lock(&__aio_mutex); 16162248Sraf first = aiowp = *nextworker; 16172248Sraf if (mode != AIONOTIFY) 16182248Sraf _aio_outstand_cnt++; 16192248Sraf sig_mutex_unlock(&__aio_mutex); 16202248Sraf 16212248Sraf switch (mode) { 16222248Sraf case AIOREAD: 16232248Sraf case AIOWRITE: 16242248Sraf case AIOAREAD: 16252248Sraf case AIOAWRITE: 16262248Sraf #if !defined(_LP64) 16272248Sraf case AIOAREAD64: 16282248Sraf case AIOAWRITE64: 16292248Sraf #endif 16302248Sraf /* try to find an idle worker */ 16312248Sraf found = 0; 16322248Sraf do { 16332248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16342248Sraf if (aiowp->work_idleflg) { 16352248Sraf found = 1; 16362248Sraf break; 16372248Sraf } 16382248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 16392248Sraf } 16402248Sraf } while ((aiowp = aiowp->work_forw) != first); 16412248Sraf 16422248Sraf if (found) { 16432248Sraf aiowp->work_minload1++; 16442248Sraf break; 16452248Sraf } 16462248Sraf 16472248Sraf /* try to acquire some worker's queue lock */ 16482248Sraf do { 16492248Sraf if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { 16502248Sraf found = 1; 16512248Sraf break; 16522248Sraf } 16532248Sraf } while ((aiowp = aiowp->work_forw) != first); 16542248Sraf 16552248Sraf /* 16562248Sraf * Create more workers when the workers appear overloaded. 16572248Sraf * Either all the workers are busy draining their queues 16582248Sraf * or no worker's queue lock could be acquired. 16592248Sraf */ 16602248Sraf if (!found) { 16612248Sraf if (_aio_worker_cnt < _max_workers) { 16622248Sraf if (_aio_create_worker(reqp, mode)) 16632248Sraf aio_panic("_aio_req_add: add worker"); 16642248Sraf sigon(self); /* reenable SIGIO */ 16652248Sraf return; 16662248Sraf } 16672248Sraf 16682248Sraf /* 16692248Sraf * No worker available and we have created 16702248Sraf * _max_workers, keep going through the 16712248Sraf * list slowly until we get a lock 16722248Sraf */ 16732248Sraf while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { 16742248Sraf /* 16752248Sraf * give someone else a chance 16762248Sraf */ 16772248Sraf _aio_delay(1); 16782248Sraf aiowp = aiowp->work_forw; 16792248Sraf } 16802248Sraf } 16812248Sraf 16822248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 16832248Sraf if (_aio_worker_cnt < _max_workers && 16842248Sraf aiowp->work_minload1 >= _minworkload) { 16852248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 16862248Sraf sig_mutex_lock(&__aio_mutex); 16872248Sraf *nextworker = aiowp->work_forw; 16882248Sraf sig_mutex_unlock(&__aio_mutex); 16892248Sraf if (_aio_create_worker(reqp, mode)) 16902248Sraf aio_panic("aio_req_add: add worker"); 16912248Sraf sigon(self); /* reenable SIGIO */ 16922248Sraf return; 16932248Sraf } 16942248Sraf aiowp->work_minload1++; 16952248Sraf break; 16962248Sraf case AIOFSYNC: 16972248Sraf case AIONOTIFY: 16982248Sraf load_bal_flg = 0; 16992248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17002248Sraf break; 17012248Sraf default: 17022248Sraf aio_panic("_aio_req_add: invalid mode"); 17032248Sraf break; 17042248Sraf } 17052248Sraf /* 17062248Sraf * Put request onto worker's work queue. 17072248Sraf */ 17082248Sraf if (aiowp->work_tail1 == NULL) { 17092248Sraf ASSERT(aiowp->work_count1 == 0); 17102248Sraf aiowp->work_tail1 = reqp; 17112248Sraf aiowp->work_next1 = reqp; 17122248Sraf } else { 17132248Sraf aiowp->work_head1->req_next = reqp; 17142248Sraf if (aiowp->work_next1 == NULL) 17152248Sraf aiowp->work_next1 = reqp; 17162248Sraf } 17172248Sraf reqp->req_state = AIO_REQ_QUEUED; 17182248Sraf reqp->req_worker = aiowp; 17192248Sraf aiowp->work_head1 = reqp; 17202248Sraf /* 17212248Sraf * Awaken worker if it is not currently active. 17222248Sraf */ 17232248Sraf if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { 17242248Sraf aiowp->work_idleflg = 0; 17252248Sraf (void) cond_signal(&aiowp->work_idle_cv); 17262248Sraf } 17272248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17282248Sraf 17292248Sraf if (load_bal_flg) { 17302248Sraf sig_mutex_lock(&__aio_mutex); 17312248Sraf *nextworker = aiowp->work_forw; 17322248Sraf sig_mutex_unlock(&__aio_mutex); 17332248Sraf } 17342248Sraf sigon(self); /* reenable SIGIO */ 17352248Sraf } 17362248Sraf 17372248Sraf /* 17382248Sraf * Get an AIO request for a specified worker. 17392248Sraf * If the work queue is empty, return NULL. 17402248Sraf */ 17412248Sraf aio_req_t * 17422248Sraf _aio_req_get(aio_worker_t *aiowp) 17432248Sraf { 17442248Sraf aio_req_t *reqp; 17452248Sraf 17462248Sraf sig_mutex_lock(&aiowp->work_qlock1); 17472248Sraf if ((reqp = aiowp->work_next1) != NULL) { 17482248Sraf /* 17492248Sraf * Remove a POSIX request from the queue; the 17502248Sraf * request queue is a singularly linked list 17512248Sraf * with a previous pointer. The request is 17522248Sraf * removed by updating the previous pointer. 17532248Sraf * 17542248Sraf * Non-posix requests are left on the queue 17552248Sraf * to eventually be placed on the done queue. 17562248Sraf */ 17572248Sraf 17582248Sraf if (POSIX_AIO(reqp)) { 17592248Sraf if (aiowp->work_prev1 == NULL) { 17602248Sraf aiowp->work_tail1 = reqp->req_next; 17612248Sraf if (aiowp->work_tail1 == NULL) 17622248Sraf aiowp->work_head1 = NULL; 17632248Sraf } else { 17642248Sraf aiowp->work_prev1->req_next = reqp->req_next; 17652248Sraf if (aiowp->work_head1 == reqp) 17662248Sraf aiowp->work_head1 = reqp->req_next; 17672248Sraf } 17682248Sraf 17692248Sraf } else { 17702248Sraf aiowp->work_prev1 = reqp; 17712248Sraf ASSERT(aiowp->work_done1 >= 0); 17722248Sraf aiowp->work_done1++; 17732248Sraf } 17742248Sraf ASSERT(reqp != reqp->req_next); 17752248Sraf aiowp->work_next1 = reqp->req_next; 17762248Sraf ASSERT(aiowp->work_count1 >= 1); 17772248Sraf aiowp->work_count1--; 17782248Sraf switch (reqp->req_op) { 17792248Sraf case AIOREAD: 17802248Sraf case AIOWRITE: 17812248Sraf case AIOAREAD: 17822248Sraf case AIOAWRITE: 17832248Sraf #if !defined(_LP64) 17842248Sraf case AIOAREAD64: 17852248Sraf case AIOAWRITE64: 17862248Sraf #endif 17872248Sraf ASSERT(aiowp->work_minload1 > 0); 17882248Sraf aiowp->work_minload1--; 17892248Sraf break; 17902248Sraf } 17912248Sraf reqp->req_state = AIO_REQ_INPROGRESS; 17922248Sraf } 17932248Sraf aiowp->work_req = reqp; 17942248Sraf ASSERT(reqp != NULL || aiowp->work_count1 == 0); 17952248Sraf sig_mutex_unlock(&aiowp->work_qlock1); 17962248Sraf return (reqp); 17972248Sraf } 17982248Sraf 17992248Sraf static void 18002248Sraf _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) 18012248Sraf { 18022248Sraf aio_req_t **last; 18032248Sraf aio_req_t *lastrp; 18042248Sraf aio_req_t *next; 18052248Sraf 18062248Sraf ASSERT(aiowp != NULL); 18072248Sraf ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); 18082248Sraf if (POSIX_AIO(reqp)) { 18092248Sraf if (ostate != AIO_REQ_QUEUED) 18102248Sraf return; 18112248Sraf } 18122248Sraf last = &aiowp->work_tail1; 18132248Sraf lastrp = aiowp->work_tail1; 18142248Sraf ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); 18152248Sraf while ((next = *last) != NULL) { 18162248Sraf if (next == reqp) { 18172248Sraf *last = next->req_next; 18182248Sraf if (aiowp->work_next1 == next) 18192248Sraf aiowp->work_next1 = next->req_next; 18202248Sraf 18212248Sraf if ((next->req_next != NULL) || 18222248Sraf (aiowp->work_done1 == 0)) { 18232248Sraf if (aiowp->work_head1 == next) 18242248Sraf aiowp->work_head1 = next->req_next; 18252248Sraf if (aiowp->work_prev1 == next) 18262248Sraf aiowp->work_prev1 = next->req_next; 18272248Sraf } else { 18282248Sraf if (aiowp->work_head1 == next) 18292248Sraf aiowp->work_head1 = lastrp; 18302248Sraf if (aiowp->work_prev1 == next) 18312248Sraf aiowp->work_prev1 = lastrp; 18322248Sraf } 18332248Sraf 18342248Sraf if (ostate == AIO_REQ_QUEUED) { 18352248Sraf ASSERT(aiowp->work_count1 >= 1); 18362248Sraf aiowp->work_count1--; 18372248Sraf ASSERT(aiowp->work_minload1 >= 1); 18382248Sraf aiowp->work_minload1--; 18392248Sraf } else { 18402248Sraf ASSERT(ostate == AIO_REQ_INPROGRESS && 18412248Sraf !POSIX_AIO(reqp)); 18422248Sraf aiowp->work_done1--; 18432248Sraf } 18442248Sraf return; 18452248Sraf } 18462248Sraf last = &next->req_next; 18472248Sraf lastrp = next; 18482248Sraf } 18492248Sraf /* NOTREACHED */ 18502248Sraf } 18512248Sraf 18522248Sraf static void 18532248Sraf _aio_enq_doneq(aio_req_t *reqp) 18542248Sraf { 18552248Sraf if (_aio_doneq == NULL) { 18562248Sraf _aio_doneq = reqp; 18572248Sraf reqp->req_next = reqp->req_prev = reqp; 18582248Sraf } else { 18592248Sraf reqp->req_next = _aio_doneq; 18602248Sraf reqp->req_prev = _aio_doneq->req_prev; 18612248Sraf _aio_doneq->req_prev->req_next = reqp; 18622248Sraf _aio_doneq->req_prev = reqp; 18632248Sraf } 18642248Sraf reqp->req_state = AIO_REQ_DONEQ; 18652248Sraf _aio_doneq_cnt++; 18662248Sraf } 18672248Sraf 18682248Sraf /* 18692248Sraf * caller owns the _aio_mutex 18702248Sraf */ 18712248Sraf aio_req_t * 18722248Sraf _aio_req_remove(aio_req_t *reqp) 18732248Sraf { 18742248Sraf if (reqp && reqp->req_state != AIO_REQ_DONEQ) 18752248Sraf return (NULL); 18762248Sraf 18772248Sraf if (reqp) { 18782248Sraf /* request in done queue */ 18792248Sraf if (_aio_doneq == reqp) 18802248Sraf _aio_doneq = reqp->req_next; 18812248Sraf if (_aio_doneq == reqp) { 18822248Sraf /* only one request on queue */ 18832248Sraf _aio_doneq = NULL; 18842248Sraf } else { 18852248Sraf aio_req_t *tmp = reqp->req_next; 18862248Sraf reqp->req_prev->req_next = tmp; 18872248Sraf tmp->req_prev = reqp->req_prev; 18882248Sraf } 18892248Sraf } else if ((reqp = _aio_doneq) != NULL) { 18902248Sraf if (reqp == reqp->req_next) { 18912248Sraf /* only one request on queue */ 18922248Sraf _aio_doneq = NULL; 18932248Sraf } else { 18942248Sraf reqp->req_prev->req_next = _aio_doneq = reqp->req_next; 18952248Sraf _aio_doneq->req_prev = reqp->req_prev; 18962248Sraf } 18972248Sraf } 18982248Sraf if (reqp) { 18992248Sraf _aio_doneq_cnt--; 19002248Sraf reqp->req_next = reqp->req_prev = reqp; 19012248Sraf reqp->req_state = AIO_REQ_DONE; 19022248Sraf } 19032248Sraf return (reqp); 19042248Sraf } 19052248Sraf 19062248Sraf /* 19072248Sraf * An AIO request is identified by an aio_result_t pointer. The library 19082248Sraf * maps this aio_result_t pointer to its internal representation using a 19092248Sraf * hash table. This function adds an aio_result_t pointer to the hash table. 19102248Sraf */ 19112248Sraf static int 19122248Sraf _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) 19132248Sraf { 19142248Sraf aio_hash_t *hashp; 19152248Sraf aio_req_t **prev; 19162248Sraf aio_req_t *next; 19172248Sraf 19182248Sraf hashp = _aio_hash + AIOHASH(resultp); 19192248Sraf lmutex_lock(&hashp->hash_lock); 19202248Sraf prev = &hashp->hash_ptr; 19212248Sraf while ((next = *prev) != NULL) { 19222248Sraf if (resultp == next->req_resultp) { 19232248Sraf lmutex_unlock(&hashp->hash_lock); 19242248Sraf return (-1); 19252248Sraf } 19262248Sraf prev = &next->req_link; 19272248Sraf } 19282248Sraf *prev = reqp; 19292248Sraf ASSERT(reqp->req_link == NULL); 19302248Sraf lmutex_unlock(&hashp->hash_lock); 19312248Sraf return (0); 19322248Sraf } 19332248Sraf 19342248Sraf /* 19352248Sraf * Remove an entry from the hash table. 19362248Sraf */ 19372248Sraf aio_req_t * 19382248Sraf _aio_hash_del(aio_result_t *resultp) 19392248Sraf { 19402248Sraf aio_hash_t *hashp; 19412248Sraf aio_req_t **prev; 19422248Sraf aio_req_t *next = NULL; 19432248Sraf 19442248Sraf if (_aio_hash != NULL) { 19452248Sraf hashp = _aio_hash + AIOHASH(resultp); 19462248Sraf lmutex_lock(&hashp->hash_lock); 19472248Sraf prev = &hashp->hash_ptr; 19482248Sraf while ((next = *prev) != NULL) { 19492248Sraf if (resultp == next->req_resultp) { 19502248Sraf *prev = next->req_link; 19512248Sraf next->req_link = NULL; 19522248Sraf break; 19532248Sraf } 19542248Sraf prev = &next->req_link; 19552248Sraf } 19562248Sraf lmutex_unlock(&hashp->hash_lock); 19572248Sraf } 19582248Sraf return (next); 19592248Sraf } 19602248Sraf 19612248Sraf /* 19622248Sraf * find an entry in the hash table 19632248Sraf */ 19642248Sraf aio_req_t * 19652248Sraf _aio_hash_find(aio_result_t *resultp) 19662248Sraf { 19672248Sraf aio_hash_t *hashp; 19682248Sraf aio_req_t **prev; 19692248Sraf aio_req_t *next = NULL; 19702248Sraf 19712248Sraf if (_aio_hash != NULL) { 19722248Sraf hashp = _aio_hash + AIOHASH(resultp); 19732248Sraf lmutex_lock(&hashp->hash_lock); 19742248Sraf prev = &hashp->hash_ptr; 19752248Sraf while ((next = *prev) != NULL) { 19762248Sraf if (resultp == next->req_resultp) 19772248Sraf break; 19782248Sraf prev = &next->req_link; 19792248Sraf } 19802248Sraf lmutex_unlock(&hashp->hash_lock); 19812248Sraf } 19822248Sraf return (next); 19832248Sraf } 19842248Sraf 19852248Sraf /* 19862248Sraf * AIO interface for POSIX 19872248Sraf */ 19882248Sraf int 19892248Sraf _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 19902248Sraf int mode, int flg) 19912248Sraf { 19922248Sraf aio_req_t *reqp; 19932248Sraf aio_args_t *ap; 19942248Sraf int kerr; 19952248Sraf 19962248Sraf if (aiocbp == NULL) { 19972248Sraf errno = EINVAL; 19982248Sraf return (-1); 19992248Sraf } 20002248Sraf 20012248Sraf /* initialize kaio */ 20022248Sraf if (!_kaio_ok) 20032248Sraf _kaio_init(); 20042248Sraf 20052248Sraf aiocbp->aio_state = NOCHECK; 20062248Sraf 20072248Sraf /* 20082248Sraf * If we have been called because a list I/O 20092248Sraf * kaio() failed, we dont want to repeat the 20102248Sraf * system call 20112248Sraf */ 20122248Sraf 20132248Sraf if (flg & AIO_KAIO) { 20142248Sraf /* 20152248Sraf * Try kernel aio first. 20162248Sraf * If errno is ENOTSUP/EBADFD, 20172248Sraf * fall back to the thread implementation. 20182248Sraf */ 20192248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 20202248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20212248Sraf aiocbp->aio_state = CHECK; 20222248Sraf kerr = (int)_kaio(mode, aiocbp); 20232248Sraf if (kerr == 0) 20242248Sraf return (0); 20252248Sraf if (errno != ENOTSUP && errno != EBADFD) { 20262248Sraf aiocbp->aio_resultp.aio_errno = errno; 20272248Sraf aiocbp->aio_resultp.aio_return = -1; 20282248Sraf aiocbp->aio_state = NOCHECK; 20292248Sraf return (-1); 20302248Sraf } 20312248Sraf if (errno == EBADFD) 20322248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 20332248Sraf } 20342248Sraf } 20352248Sraf 20362248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 20372248Sraf aiocbp->aio_state = USERAIO; 20382248Sraf 20392248Sraf if (!__uaio_ok && __uaio_init() == -1) 20402248Sraf return (-1); 20412248Sraf 20422248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 20432248Sraf errno = EAGAIN; 20442248Sraf return (-1); 20452248Sraf } 20462248Sraf 20472248Sraf /* 20482248Sraf * If an LIO request, add the list head to the aio request 20492248Sraf */ 20502248Sraf reqp->req_head = lio_head; 20512248Sraf reqp->req_type = AIO_POSIX_REQ; 20522248Sraf reqp->req_op = mode; 20532248Sraf reqp->req_largefile = 0; 20542248Sraf 20552248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 20562248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 20572248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 20582248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 20592248Sraf reqp->req_sigevent.sigev_signo = 20602248Sraf aiocbp->aio_sigevent.sigev_signo; 20612248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20622248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 20632248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 20642248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 20652248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 20662248Sraf /* 20672248Sraf * Reuse the sigevent structure to contain the port number 20682248Sraf * and the user value. Same for SIGEV_THREAD, below. 20692248Sraf */ 20702248Sraf reqp->req_sigevent.sigev_signo = 20712248Sraf pn->portnfy_port; 20722248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20732248Sraf pn->portnfy_user; 20742248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 20752248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 20762248Sraf /* 20772248Sraf * The sigevent structure contains the port number 20782248Sraf * and the user value. Same for SIGEV_PORT, above. 20792248Sraf */ 20802248Sraf reqp->req_sigevent.sigev_signo = 20812248Sraf aiocbp->aio_sigevent.sigev_signo; 20822248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 20832248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 20842248Sraf } 20852248Sraf 20862248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 20872248Sraf reqp->req_aiocbp = aiocbp; 20882248Sraf ap = &reqp->req_args; 20892248Sraf ap->fd = aiocbp->aio_fildes; 20902248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 20912248Sraf ap->bufsz = aiocbp->aio_nbytes; 20922248Sraf ap->offset = aiocbp->aio_offset; 20932248Sraf 20942248Sraf if ((flg & AIO_NO_DUPS) && 20952248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 20962248Sraf aio_panic("_aio_rw(): request already in hash table"); 20972248Sraf _aio_req_free(reqp); 20982248Sraf errno = EINVAL; 20992248Sraf return (-1); 21002248Sraf } 21012248Sraf _aio_req_add(reqp, nextworker, mode); 21022248Sraf return (0); 21032248Sraf } 21042248Sraf 21052248Sraf #if !defined(_LP64) 21062248Sraf /* 21072248Sraf * 64-bit AIO interface for POSIX 21082248Sraf */ 21092248Sraf int 21102248Sraf _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, 21112248Sraf int mode, int flg) 21122248Sraf { 21132248Sraf aio_req_t *reqp; 21142248Sraf aio_args_t *ap; 21152248Sraf int kerr; 21162248Sraf 21172248Sraf if (aiocbp == NULL) { 21182248Sraf errno = EINVAL; 21192248Sraf return (-1); 21202248Sraf } 21212248Sraf 21222248Sraf /* initialize kaio */ 21232248Sraf if (!_kaio_ok) 21242248Sraf _kaio_init(); 21252248Sraf 21262248Sraf aiocbp->aio_state = NOCHECK; 21272248Sraf 21282248Sraf /* 21292248Sraf * If we have been called because a list I/O 21302248Sraf * kaio() failed, we dont want to repeat the 21312248Sraf * system call 21322248Sraf */ 21332248Sraf 21342248Sraf if (flg & AIO_KAIO) { 21352248Sraf /* 21362248Sraf * Try kernel aio first. 21372248Sraf * If errno is ENOTSUP/EBADFD, 21382248Sraf * fall back to the thread implementation. 21392248Sraf */ 21402248Sraf if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { 21412248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21422248Sraf aiocbp->aio_state = CHECK; 21432248Sraf kerr = (int)_kaio(mode, aiocbp); 21442248Sraf if (kerr == 0) 21452248Sraf return (0); 21462248Sraf if (errno != ENOTSUP && errno != EBADFD) { 21472248Sraf aiocbp->aio_resultp.aio_errno = errno; 21482248Sraf aiocbp->aio_resultp.aio_return = -1; 21492248Sraf aiocbp->aio_state = NOCHECK; 21502248Sraf return (-1); 21512248Sraf } 21522248Sraf if (errno == EBADFD) 21532248Sraf SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); 21542248Sraf } 21552248Sraf } 21562248Sraf 21572248Sraf aiocbp->aio_resultp.aio_errno = EINPROGRESS; 21582248Sraf aiocbp->aio_state = USERAIO; 21592248Sraf 21602248Sraf if (!__uaio_ok && __uaio_init() == -1) 21612248Sraf return (-1); 21622248Sraf 21632248Sraf if ((reqp = _aio_req_alloc()) == NULL) { 21642248Sraf errno = EAGAIN; 21652248Sraf return (-1); 21662248Sraf } 21672248Sraf 21682248Sraf /* 21692248Sraf * If an LIO request, add the list head to the aio request 21702248Sraf */ 21712248Sraf reqp->req_head = lio_head; 21722248Sraf reqp->req_type = AIO_POSIX_REQ; 21732248Sraf reqp->req_op = mode; 21742248Sraf reqp->req_largefile = 1; 21752248Sraf 21762248Sraf if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { 21772248Sraf reqp->req_sigevent.sigev_notify = SIGEV_NONE; 21782248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { 21792248Sraf reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; 21802248Sraf reqp->req_sigevent.sigev_signo = 21812248Sraf aiocbp->aio_sigevent.sigev_signo; 21822248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21832248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21842248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { 21852248Sraf port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; 21862248Sraf reqp->req_sigevent.sigev_notify = SIGEV_PORT; 21872248Sraf reqp->req_sigevent.sigev_signo = 21882248Sraf pn->portnfy_port; 21892248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21902248Sraf pn->portnfy_user; 21912248Sraf } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { 21922248Sraf reqp->req_sigevent.sigev_notify = SIGEV_THREAD; 21932248Sraf reqp->req_sigevent.sigev_signo = 21942248Sraf aiocbp->aio_sigevent.sigev_signo; 21952248Sraf reqp->req_sigevent.sigev_value.sival_ptr = 21962248Sraf aiocbp->aio_sigevent.sigev_value.sival_ptr; 21972248Sraf } 21982248Sraf 21992248Sraf reqp->req_resultp = &aiocbp->aio_resultp; 22002248Sraf reqp->req_aiocbp = aiocbp; 22012248Sraf ap = &reqp->req_args; 22022248Sraf ap->fd = aiocbp->aio_fildes; 22032248Sraf ap->buf = (caddr_t)aiocbp->aio_buf; 22042248Sraf ap->bufsz = aiocbp->aio_nbytes; 22052248Sraf ap->offset = aiocbp->aio_offset; 22062248Sraf 22072248Sraf if ((flg & AIO_NO_DUPS) && 22082248Sraf _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { 22092248Sraf aio_panic("_aio_rw64(): request already in hash table"); 22102248Sraf _aio_req_free(reqp); 22112248Sraf errno = EINVAL; 22122248Sraf return (-1); 22132248Sraf } 22142248Sraf _aio_req_add(reqp, nextworker, mode); 22152248Sraf return (0); 22162248Sraf } 22172248Sraf #endif /* !defined(_LP64) */ 2218